In [None]:
!apt-get update -qq
!apt-get install -y openjdk-11-jdk-headless -qq > /dev/null

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [None]:
!wget -q https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark pyspark opencv-python numpy pandas moviepy ultralytics scipy

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.1 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.1/1.1 MB[0m [31m34.8 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m21.6 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
!pip install -q findspark pyspark opencv-python numpy pandas scipy ultralytics deep-sort-realtime supervision

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.4/8.4 MB[0m [31m34.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m207.2/207.2 kB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
spark = SparkSession.builder \
    .appName("AthleteSpeedAnalysis") \
    .master("local[*]") \
    .getOrCreate()

In [None]:
sc = spark.sparkContext
print("Spark initialized:", spark.version)

Spark initialized: 3.5.1


In [None]:
import cv2
import numpy as np
import pandas as pd
from scipy.spatial import distance
import supervision as sv
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort
from google.colab.patches import cv2_imshow
from google.colab import drive

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [None]:
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
class Config:
    VIDEO_PATH = "/content/drive/MyDrive/Big_data_Project/clip-1.mp4"
    OUTPUT_PATH = "/content/athlete_speed_output.mp4"
    TRACK_LENGTH = 100  # meters
    LANE_WIDTH = 1.22   # standard lane width in meters
    MIN_CONFIDENCE = 0.5
    MAX_AGE = 50
    NMS_THRESHOLD = 0.4

config = Config()

In [None]:
class SpeedCalculator:
    def __init__(self, track_length=100):
        self.track_length = track_length
        self.athlete_data = {}
        self.frame_rate = 25  # Will be updated from video
        self.pixels_per_meter = None
        self.speed_smoothing_window = 5  # Number of frames for smoothing

    def set_frame_rate(self, fps):
        self.frame_rate = fps

    def estimate_pixels_per_meter(self, frame_width, frame_height):
        """Estimate pixels per meter based on typical race video setup"""
        # For 100m race videos, track typically occupies 70-80% of frame width
        estimated_track_pixels = frame_width * 0.75
        self.pixels_per_meter = estimated_track_pixels / self.track_length
        print(f"Estimated pixels per meter: {self.pixels_per_meter:.2f}")
        return self.pixels_per_meter

    def update_athlete_position(self, athlete_id, position, frame_number):
        """Update athlete position and calculate speed with better filtering"""
        if athlete_id not in self.athlete_data:
            self.athlete_data[athlete_id] = {
                'positions': [],
                'frame_numbers': [],
                'speeds': [],
                'raw_speeds': [],  # Store raw speeds for debugging
                'distances': [0]
            }

        data = self.athlete_data[athlete_id]
        data['positions'].append(position)
        data['frame_numbers'].append(frame_number)

        # Keep only recent history to prevent memory buildup
        max_history = 50
        if len(data['positions']) > max_history:
            data['positions'] = data['positions'][-max_history:]
            data['frame_numbers'] = data['frame_numbers'][-max_history:]
            data['speeds'] = data['speeds'][-max_history//2:]

        # Calculate speed if we have enough data points
        if len(data['positions']) >= 3:  # Increased minimum for better accuracy
            current_pos = np.array(position)

            # Use positions from 2 frames back for more stable calculation
            if len(data['positions']) >= 3:
                prev_pos = np.array(data['positions'][-3])
                frames_elapsed = data['frame_numbers'][-1] - data['frame_numbers'][-3]
            else:
                prev_pos = np.array(data['positions'][-2])
                frames_elapsed = data['frame_numbers'][-1] - data['frame_numbers'][-2]

            # Calculate pixel distance
            pixel_distance = distance.euclidean(current_pos, prev_pos)

            if self.pixels_per_meter and frames_elapsed > 0:
                # Convert to real distance
                real_distance = pixel_distance / self.pixels_per_meter
                time_elapsed = frames_elapsed / self.frame_rate

                if time_elapsed > 0:
                    speed_mps = real_distance / time_elapsed
                    speed_kmph = speed_mps * 3.6

                    # Filter unrealistic speeds (athletes can't run faster than 45 km/h)
                    if speed_kmph < 45:  # World record is about 37-38 km/h for 100m
                        data['raw_speeds'].append(speed_kmph)

                        # Apply moving average smoothing
                        if len(data['raw_speeds']) >= 3:
                            recent_speeds = data['raw_speeds'][-3:]
                            smoothed_speed = np.mean(recent_speeds)
                            data['speeds'].append(smoothed_speed)

                            # Keep speeds list manageable
                            if len(data['speeds']) > 10:
                                data['speeds'] = data['speeds'][-10:]

    def get_current_speed(self, athlete_id):
        """Get smoothed current speed for athlete with validation"""
        if (athlete_id in self.athlete_data and
            self.athlete_data[athlete_id]['speeds']):
            speeds = self.athlete_data[athlete_id]['speeds']

            # Return the most recent valid speed
            if speeds:
                current_speed = speeds[-1]
                # Ensure speed is realistic
                if current_speed <= 45:  # Maximum realistic speed
                    return current_speed

        return 0.0


In [None]:
def manual_calibration(video_path):
    """Manual calibration for better speed accuracy"""
    cap = cv2.VideoCapture(video_path)
    ret, frame = cap.read()

    if ret:
        print("\n=== MANUAL CALIBRATION ===")
        print("For accurate speed measurement, we need to calibrate.")
        print("The standard 100m track will be used.")

        height, width = frame.shape[:2]

        # Use lane width (1.22m standard) for calibration
        # Estimate based on typical camera positioning
        lane_width_pixels = width * 0.1  # Approximate lane width in pixels
        pixels_per_meter = lane_width_pixels / 1.22

        print(f"Using calibration: {pixels_per_meter:.2f} pixels/meter")
        print("This assumes standard lane width of 1.22m")

        cap.release()
        return pixels_per_meter
    return None

In [None]:
class AthleteDetector:
    def __init__(self):
        # Load YOLO model
        self.model = YOLO("yolov8s.pt")

    def detect_athletes(self, frame, conf_threshold=0.5):
        """Detect athletes in frame with filtering for sports context"""
        results = self.model(frame, conf=conf_threshold, verbose=False)

        detections = []
        confidences = []

        for r in results:
            boxes = r.boxes
            for box in boxes:
                conf = box.conf.item()
                cls_id = int(box.cls.item())
                class_name = self.model.names[cls_id]

                # Filter for persons only
                if class_name == 'person' and conf > conf_threshold:
                    x1, y1, x2, y2 = box.xyxy[0].tolist()

                    # Additional filtering for athlete-like proportions
                    width = x2 - x1
                    height = y2 - y1

                    # Athletes typically have specific proportions and sizes
                    if height > 100 and width > 30:
                        detections.append([x1, y1, x2 - x1, y2 - y1])
                        confidences.append(conf)

        return detections, confidences

def create_track_mask(frame_shape):
    """Create a mask to focus on track area and reduce false detections"""
    height, width = frame_shape[:2]
    mask = np.zeros((height, width), dtype=np.uint8)

    # Define track area (adjust based on your video)
    track_top = int(height * 0.3)
    track_bottom = int(height * 0.95)
    track_left = int(width * 0.1)
    track_right = int(width * 0.9)

    cv2.rectangle(mask, (track_left, track_top), (track_right, track_bottom), 255, -1)
    return mask

def generate_distinct_color(track_id):
    """Generate distinct color for each track ID"""
    # Convert track_id to integer if it's a string
    if isinstance(track_id, str):
        try:
            track_id = int(track_id)
        except:
            track_id = hash(track_id) % 1000

    # Generate color based on track ID
    hue = (track_id * 50) % 180  # Use HSV for better color distribution
    saturation = 255
    value = 255

    # Convert HSV to BGR
    hsv_color = np.uint8([[[hue, saturation, value]]])
    bgr_color = cv2.cvtColor(hsv_color, cv2.COLOR_HSV2BGR)
    color = tuple(map(int, bgr_color[0][0]))

    return color

def process_video_comprehensive():
    """Main video processing function with athlete detection, tracking and speed calculation"""

    # Initialize components
    detector = AthleteDetector()
    tracker = DeepSort(
        max_age=config.MAX_AGE,
        n_init=3,
        max_cosine_distance=0.4,
        nms_max_overlap=0.8
    )
    speed_calc = SpeedCalculator(config.TRACK_LENGTH)

    # Video setup
    cap = cv2.VideoCapture(config.VIDEO_PATH)
    if not cap.isOpened():
        print("Error: Could not open video")
        return

    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    speed_calc.set_frame_rate(fps)

    # Estimate pixels per meter
    pixels_per_meter = manual_calibration(config.VIDEO_PATH)
    if pixels_per_meter:
      speed_calc.pixels_per_meter = pixels_per_meter
      print(f"Using manual calibration: {pixels_per_meter:.2f} pixels/meter")
    else:
      speed_calc.estimate_pixels_per_meter(frame_width, frame_height)

    # Create track mask
    track_mask = create_track_mask((frame_height, frame_width))

    # Output video setup
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(config.OUTPUT_PATH, fourcc, fps, (frame_width, frame_height))

    print(f"Video Info: {frame_width}x{frame_height}, {fps} FPS")
    print("Starting athlete detection and tracking...")

    frame_count = 0
    athlete_colors = {}

    # Store data for Spark analysis
    spark_data = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Apply track mask to reduce background noise
        masked_frame = cv2.bitwise_and(frame, frame, mask=track_mask)

        # Detect athletes
        detections, confidences = detector.detect_athletes(
            masked_frame, config.MIN_CONFIDENCE
        )

        # Format detections for DeepSORT
        detection_list = []
        for det, conf in zip(detections, confidences):
            x, y, w, h = det
            detection_list.append(([x, y, w, h], conf, 'athlete'))

        # Update tracker
        tracks = tracker.update_tracks(detection_list, frame=frame)

        # Process tracked athletes
        current_frame_data = {'frame': frame_count, 'timestamp': frame_count/fps, 'athletes': {}}

        for track in tracks:
            if not track.is_confirmed():
                continue

            track_id = track.track_id
            ltrb = track.to_ltrb()
            x1, y1, x2, y2 = map(int, ltrb)

            # Calculate centroid
            centroid_x = (x1 + x2) // 2
            centroid_y = (y1 + y2) // 2
            centroid = (centroid_x, centroid_y)

            # Update speed calculation
            speed_calc.update_athlete_position(track_id, centroid, frame_count)
            current_speed = speed_calc.get_current_speed(track_id)

            # Store for Spark analysis
            current_frame_data['athletes'][str(track_id)] = {
                'position': centroid,
                'speed_kmph': current_speed,
                'bbox': [x1, y1, x2, y2]
            }

            # Assign consistent color for each athlete
            if track_id not in athlete_colors:
                athlete_colors[track_id] = generate_distinct_color(track_id)

            color = athlete_colors[track_id]

            # Draw bounding box
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

            # Draw info panel - FIXED: Use np.maximum instead of max()
            info_bg_y1 = np.maximum(0, y1 - 60)  # Fixed line
            info_bg_y2 = y1
            cv2.rectangle(frame, (x1, info_bg_y1), (x2, info_bg_y2), color, -1)
            cv2.rectangle(frame, (x1, info_bg_y1), (x2, info_bg_y2), color, 2)

            # Display athlete info
            cv2.putText(frame, f"Athlete {track_id}", (x1 + 5, y1 - 40),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
            cv2.putText(frame, f"Speed: {current_speed:.1f} km/h", (x1 + 5, y1 - 20),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

            # Draw centroid and trail
            cv2.circle(frame, centroid, 5, color, -1)

            # Draw movement trail
            if track_id in speed_calc.athlete_data:
                positions = speed_calc.athlete_data[track_id]['positions']
                if len(positions) > 1:
                    for i in range(1, len(positions)):
                        start_pos = tuple(map(int, positions[i-1]))
                        end_pos = tuple(map(int, positions[i]))
                        cv2.line(frame, start_pos, end_pos, color, 2)

        spark_data.append(current_frame_data)

        # Write frame to output
        out.write(frame)
        frame_count += 1

        if frame_count % 50 == 0:
            print(f"Processed {frame_count} frames...")

    cap.release()
    out.release()
    cv2.destroyAllWindows()

    print(f"Processing complete! Processed {frame_count} frames.")

    # Analyze with Spark
    analyze_with_spark(spark_data, fps)

    return config.OUTPUT_PATH



    # Basic statistics
    print("\nOverall Statistics:")
    df.select(
        count("athlete_id").alias("total_detections"),
        countDistinct("athlete_id").alias("unique_athletes"),
        avg("speed_kmph").alias("avg_speed_kmph"),
        max("speed_kmph").alias("max_speed_kmph"),
        min("speed_kmph").alias("min_speed_kmph")
    ).show()

    # Athlete-specific analysis
    print("\nAthlete Performance Summary:")
    athlete_stats = df.groupBy("athlete_id").agg(
        avg("speed_kmph").alias("avg_speed"),
        max("speed_kmph").alias("max_speed"),
        min("speed_kmph").alias("min_speed"),
        count("frame").alias("frames_detected"),
        (max("timestamp") - min("timestamp")).alias("time_tracked_seconds")
    ).orderBy("avg_speed", ascending=False)

    athlete_stats.show()

    # Speed distribution over time
    print("\nSpeed Analysis Over Race:")
    from pyspark.sql.window import Window
    window_spec = Window.partitionBy("athlete_id").orderBy("timestamp")
    speed_analysis = df.withColumn("speed_trend", avg("speed_kmph").over(
        window_spec.rowsBetween(-5, 5)
    ))

    # Show speed trends for top athletes
    top_athletes = [row['athlete_id'] for row in athlete_stats.head(3)]
    for athlete in top_athletes:
        athlete_speeds = speed_analysis.filter(col("athlete_id") == athlete) \
                                      .select("timestamp", "speed_kmph", "speed_trend") \
                                      .orderBy("timestamp")

        print(f"\nSpeed trend for Athlete {athlete}:")
        athlete_speeds.show(10)

In [None]:
def analyze_with_spark(athlete_data, fps):
    """Use Spark to analyze athlete performance"""

    print("\n" + "="*50)
    print("SPARK PERFORMANCE ANALYSIS")
    print("="*50)

    # Prepare data for Spark
    spark_records = []
    for frame_data in athlete_data:
        for athlete_id, athlete_info in frame_data['athletes'].items():
            spark_records.append({
                'frame': frame_data['frame'],
                'timestamp': float(frame_data['timestamp']),  # Convert to float
                'athlete_id': athlete_id,
                'speed_kmph': float(athlete_info['speed_kmph']),  # Convert to float
                'position_x': float(athlete_info['position'][0]),  # Convert to float
                'position_y': float(athlete_info['position'][1])   # Convert to float
            })

    if not spark_records:
        print("No athlete data to analyze")
        return

    # Define explicit schema
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

    schema = StructType([
        StructField("frame", IntegerType(), True),
        StructField("timestamp", FloatType(), True),
        StructField("athlete_id", StringType(), True),
        StructField("speed_kmph", FloatType(), True),
        StructField("position_x", FloatType(), True),
        StructField("position_y", FloatType(), True)
    ])

    # Create Spark DataFrame with explicit schema
    df = spark.createDataFrame(spark_records, schema=schema)  # Add schema parameter

    print("Data Overview:")
    df.show(10)

    # Rest of the code remains the same...
    # Basic statistics
    print("\nOverall Statistics:")
    df.select(
        count("athlete_id").alias("total_detections"),
        countDistinct("athlete_id").alias("unique_athletes"),
        avg("speed_kmph").alias("avg_speed_kmph"),
        max("speed_kmph").alias("max_speed_kmph"),
        min("speed_kmph").alias("min_speed_kmph")
    ).show()

    # Athlete-specific analysis
    print("\nAthlete Performance Summary:")
    athlete_stats = df.groupBy("athlete_id").agg(
        avg("speed_kmph").alias("avg_speed"),
        max("speed_kmph").alias("max_speed"),
        min("speed_kmph").alias("min_speed"),
        count("frame").alias("frames_detected"),
        (max("timestamp") - min("timestamp")).alias("time_tracked_seconds")
    ).orderBy("avg_speed", ascending=False)

    athlete_stats.show()

    # Speed distribution over time
    print("\nSpeed Analysis Over Race:")
    from pyspark.sql.window import Window
    window_spec = Window.partitionBy("athlete_id").orderBy("timestamp")
    speed_analysis = df.withColumn("speed_trend", avg("speed_kmph").over(
        window_spec.rowsBetween(-5, 5)
    ))

    # Show speed trends for top athletes
    top_athletes = [row['athlete_id'] for row in athlete_stats.head(3)]
    for athlete in top_athletes:
        athlete_speeds = speed_analysis.filter(col("athlete_id") == athlete) \
                                      .select("timestamp", "speed_kmph", "speed_trend") \
                                      .orderBy("timestamp")

        print(f"\nSpeed trend for Athlete {athlete}:")
        athlete_speeds.show(10)

# Main execution
if __name__ == "__main__":
    print("=== Precise Athlete Speed Analysis ===")
    print("Features: Athlete Detection + Tracking + Speed Calculation + Spark Analysis")

    # Process video
    try:
        output_path = process_video_comprehensive()
        print(f"\n✅ Output video saved: {output_path}")

        # Download result
        from google.colab import files
        files.download(output_path)

    except Exception as e:
        print(f"Error: {e}")
        import traceback
        traceback.print_exc()

=== Precise Athlete Speed Analysis ===
Features: Athlete Detection + Tracking + Speed Calculation + Spark Analysis

=== MANUAL CALIBRATION ===
For accurate speed measurement, we need to calibrate.
The standard 100m track will be used.
Using calibration: 157.38 pixels/meter
This assumes standard lane width of 1.22m
Using manual calibration: 157.38 pixels/meter
Video Info: 1920x1080, 30.0 FPS
Starting athlete detection and tracking...
Processed 50 frames...
Processed 100 frames...
Processed 150 frames...
Processed 200 frames...
Processed 250 frames...
Processed 300 frames...
Processed 350 frames...
Processing complete! Processed 375 frames.

SPARK PERFORMANCE ANALYSIS
Data Overview:
+-----+----------+----------+----------+----------+----------+
|frame| timestamp|athlete_id|speed_kmph|position_x|position_y|
+-----+----------+----------+----------+----------+----------+
|    2|0.06666667|         1|       0.0|     460.0|     628.0|
|    3|       0.1|         1|       0.0|     455.0|     62

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>