In [None]:
# Install the required packages
! pip install shapely 

Collecting shapely
  Downloading shapely-2.1.2-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (6.8 kB)
Downloading shapely-2.1.2-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (3.1 MB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m3.1/3.1 MB[0m [31m36.5 MB/s[0m  [33m0:00:00[0m
[?25hInstalling collected packages: shapely
Successfully installed shapely-2.1.2


In [50]:
## Import Packages
import os
import random
import numpy as np
from shapely.geometry import Point, Polygon, LineString, box
from shapely.affinity import rotate
import matplotlib.pyplot as plt
from polygenerator import random_convex_polygon

# --- Core Modules (Classes) ---

class Environment:
    """
    Defines the simulation environment, which is a polygonal boundary.
    """
    def __init__(self, n_vertices=12):
        self.polygon_coords = random_convex_polygon(n_vertices)
        self.polygon = Polygon(self.polygon_coords)
        if not self.polygon.is_valid:
            raise ValueError("The generated polygon is invalid.")
        print(f"üó∫Ô∏è  Environment created with {n_vertices} vertices.")

    def get_bounds(self):
        return self.polygon.bounds

class TrackedObject:
    """
    Represents an object that moves along a path within the environment.
    """
    def __init__(self, environment_polygon, object_id, n_waypoints=5, short_side_length=0.05, seed=None):
        self.id = object_id
        self.env_poly = environment_polygon
        self.n_waypoints = n_waypoints
        self.short_side = short_side_length
        self.long_side = 4 * self.short_side
        self.seed = seed
        self.waypoints = []
        self.path = []
        self.shapes = []
        self._generate_path()
        self._create_shapes()
        print(f"  -> üéØ TrackedObject ID {self.id} created with a path of {n_waypoints} steps.")

    def _generate_path(self):
        if self.seed is not None:
            random.seed(self.seed)
            np.random.seed(self.seed)
        half_short = self.short_side / 2.0
        half_long = self.long_side / 2.0
        margin = np.sqrt(half_long**2 + half_short**2)
        safe_sampling_area = self.env_poly.buffer(-margin)
        if safe_sampling_area.is_empty:
            raise ValueError("Object size is too large for the environment.")
        minx, miny, maxx, maxy = safe_sampling_area.bounds
        while len(self.waypoints) < self.n_waypoints:
            point = Point(random.uniform(minx, maxx), random.uniform(miny, maxy))
            if safe_sampling_area.contains(point):
                self.waypoints.append(point)
        pts_arr = np.array([(p.x, p.y) for p in self.waypoints])
        dist_matrix = np.sqrt(np.sum((pts_arr[:, np.newaxis, :] - pts_arr[np.newaxis, :, :])**2, axis=-1))
        unvisited = set(range(self.n_waypoints))
        path_indices = [0]
        unvisited.remove(0)
        while unvisited:
            last_idx = path_indices[-1]
            next_idx = min(unvisited, key=lambda i: dist_matrix[last_idx, i])
            path_indices.append(next_idx)
            unvisited.remove(next_idx)
        self.path = [self.waypoints[i] for i in path_indices]

    def _create_shapes(self):
        half_short = self.short_side / 2.0
        half_long = self.long_side / 2.0
        for point in self.path:
            rectangle = box(-half_long, -half_short, half_long, half_short)
            rotated_rectangle = rotate(rectangle, random.uniform(0, 360), origin='center')
            moved_rectangle = Polygon([(p[0] + point.x, p[1] + point.y) for p in rotated_rectangle.exterior.coords])
            self.shapes.append(moved_rectangle)

class Camera:
    """
    Represents a camera sensor. Identifies *which* object was hit.
    """
    def __init__(self, position, camera_range=3.0, n_rays=36):
        self.position = position
        self.range = camera_range
        self.n_rays = n_rays
        self.rays = []
        print(f"  -> üì∑ Camera created at ({position.x:.2f}, {position.y:.2f}).")

    def cast_rays(self, environment_polygon, obstacles):
        """Casts rays, storing the ID of the hit object."""
        self.rays = []
        angles = np.linspace(0, 360, self.n_rays, endpoint=False)
        all_boundaries = obstacles + [environment_polygon.boundary]
        for angle in angles:
            end_x = self.position.x + self.range * np.cos(np.radians(angle))
            end_y = self.position.y + self.range * np.sin(np.radians(angle))
            ray = LineString([self.position, (end_x, end_y)])
            intersection_points = []
            for boundary in all_boundaries:
                if ray.intersects(boundary):
                    intersection = ray.intersection(boundary)
                    if hasattr(intersection, 'geoms'):
                        for geom in intersection.geoms:
                            if geom.geom_type == 'Point':
                                intersection_points.append(geom)
                            elif geom.geom_type == 'LineString':
                                intersection_points.append(Point(geom.coords[0]))
                    elif not intersection.is_empty:
                        if intersection.geom_type == 'Point':
                            intersection_points.append(intersection)
                        elif intersection.geom_type == 'LineString':
                            intersection_points.append(Point(intersection.coords[0]))
            end_point = ray.coords[1]
            hit_id = None
            if intersection_points:
                closest_point = min(intersection_points, key=lambda p: self.position.distance(p))
                end_point = closest_point.coords[0]
                for i, obs in enumerate(obstacles):
                    if obs.distance(closest_point) < 1e-9:
                        hit_id = i
                        break
            self.rays.append([self.position.coords[0], end_point, hit_id])

class Scene:
    """
    Manages the setup of the simulation and updates its state over time.
    """
    def __init__(self, n_objects=2, n_waypoints=10, n_cameras=3, seed=42):
        print("üöÄ Initializing Scene Setup...")
        self.env = Environment()
        self.objects = []
        self.cameras = []
        self.seed = seed
        self.num_timesteps = n_waypoints
        
        self._generate_objects(n_objects, n_waypoints)
        self._place_cameras(n_cameras)
        print("‚úÖ Scene setup complete!")
        
    def _generate_objects(self, n_objects, n_waypoints):
        print(f"üèÉ Generating {n_objects} tracked objects...")
        for i in range(n_objects):
            obj_seed = self.seed + i if self.seed is not None else None
            obj = TrackedObject(self.env.polygon, object_id=i, n_waypoints=n_waypoints, seed=obj_seed)
            self.objects.append(obj)
            
    def _place_cameras(self, n_cameras):
        print(f"üé• Placing {n_cameras} cameras...")
        if self.seed is not None:
            random.seed(self.seed + 100)
        all_future_obstacles = [shape for obj in self.objects for shape in obj.shapes]
        minx, miny, maxx, maxy = self.env.get_bounds()
        while len(self.cameras) < n_cameras:
            point = Point(random.uniform(minx, maxx), random.uniform(miny, maxy))
            is_in_env = self.env.polygon.contains(point)
            is_on_obstacle = any(obs.contains(point) for obs in all_future_obstacles)
            if is_in_env and not is_on_obstacle:
                self.cameras.append(Camera(point))

    def update_scene_at_timestep(self, time_step):
        if not (0 <= time_step < self.num_timesteps):
            print(f"‚ö†Ô∏è Warning: Time step {time_step} is out of bounds.")
            return
        current_obstacles = [obj.shapes[time_step] for obj in self.objects]
        for cam in self.cameras:
            cam.cast_rays(self.env.polygon, current_obstacles)

class VisibilityCalculator:
    """
    Calculates visibility scores for objects in the scene.
    """
    def __init__(self, scene):
        self.scene = scene

    def calculate_at_timestep(self):
        """Counts the number of rays from all cameras hitting each object."""
        scores = {obj.id: 0 for obj in self.scene.objects}
        all_rays = [ray for cam in self.scene.cameras for ray in cam.rays]
        for ray in all_rays:
            hit_id = ray[2]
            if hit_id is not None:
                scores[hit_id] += 1
        return scores

class Visualizer:
    def __init__(self, scene):
        self.scene = scene
    
    def plot_timestep(self, time_step, visibility_scores, save_fig=False, output_dir="."):
        fig, ax = plt.subplots(figsize=(10, 10))
        object_colors = ['#FF1493', '#00BFFF', '#32CD32', '#FFD700', '#9400D3', '#FF4500']
        
        # Plot Environment
        x, y = self.scene.env.polygon.exterior.xy
        ax.plot(x, y, color='black', linewidth=2, label='Environment Boundary')
        
        # Plot Tracked Objects
        for i, obj in enumerate(self.scene.objects):
            primary_color_hex = object_colors[i % len(object_colors)]
            
            # Plot the simple path for context
            path_x = [p.x for p in obj.path]
            path_y = [p.y for p in obj.path]
            ax.plot(path_x, path_y, 'o--', color=primary_color_hex, markersize=5, alpha=0.3)

            # Plot the object's current position
            current_shape = obj.shapes[time_step]
            sx, sy = current_shape.exterior.xy
            ax.fill(sx, sy, alpha=0.9, color=primary_color_hex, label=f'Object {i+1} Position')

            # *** START OF CHANGE ***
            # 1. Get the geometric center (centroid) of the object
            centroid = current_shape.centroid
            
            # 2. Get the score
            score = visibility_scores.get(obj.id, 0)
            
            # 3. Display the text right on the centroid
            ax.text(centroid.x, centroid.y, f"Hits: {score}",
                    ha='center', va='center', # Center the text block on the point
                    fontweight='bold', fontsize=9,
                    color='white',
                    bbox=dict(boxstyle='round,pad=0.2', fc='black', alpha=0.5),
                    zorder=100)
            # *** END OF CHANGE ***

        # Plot Cameras and their current rays
        for cam in self.scene.cameras:
            ax.plot(cam.position.x, cam.position.y, 'ro', markersize=8, label='Camera')
            for ray in cam.rays:
                start, end, hit_id = ray
                ray_color = 'red' if hit_id is not None else 'cyan'
                line = LineString([start, end])
                lx, ly = line.xy
                ax.plot(lx, ly, color=ray_color, linewidth=0.7)

        ax.set_aspect('equal', adjustable='box')
        ax.set_title(f"Simulation at Time Step: {time_step}", fontsize=16)
        handles, labels = ax.get_legend_handles_labels()
        by_label = dict(zip(labels, handles))
        ax.legend(by_label.values(), by_label.keys())
        
        if save_fig:
            os.makedirs(output_dir, exist_ok=True)
            filepath = os.path.join(output_dir, f"timestep_{time_step:03d}.png")
            plt.savefig(filepath)
            plt.close(fig)
        else:
            plt.show()

# --- Main Execution ---
if __name__ == "__main__":
    SAVE_PLOTS = True
    OUTPUT_DIRECTORY = "simulation_centered_text"
    NUMBER_OF_TIMESTEPS = 15

    simulation_scene = Scene(n_objects=2, n_waypoints=NUMBER_OF_TIMESTEPS, n_cameras=3, seed=42)
    visualizer = Visualizer(simulation_scene)
    calculator = VisibilityCalculator(simulation_scene)

    print(f"\nüé¨ Running simulation for {NUMBER_OF_TIMESTEPS} time steps...")

    for t in range(simulation_scene.num_timesteps):
        simulation_scene.update_scene_at_timestep(t)
        visibility_scores = calculator.calculate_at_timestep()
        print(f"  -> üñºÔ∏è  Generating frame {t}... Visibility Scores: {visibility_scores}")
        visualizer.plot_timestep(t, visibility_scores=visibility_scores, save_fig=SAVE_PLOTS, output_dir=OUTPUT_DIRECTORY)
    
    if SAVE_PLOTS:
        print(f"\n‚úÖ Simulation complete! {simulation_scene.num_timesteps} frames saved to '{OUTPUT_DIRECTORY}'.")

üöÄ Initializing Scene Setup...
üó∫Ô∏è  Environment created with 12 vertices.
üèÉ Generating 2 tracked objects...
  -> üéØ TrackedObject ID 0 created with a path of 15 steps.
  -> üéØ TrackedObject ID 1 created with a path of 15 steps.
üé• Placing 3 cameras...
  -> üì∑ Camera created at (0.58, 0.68).
  -> üì∑ Camera created at (0.17, 0.56).
  -> üì∑ Camera created at (0.87, 0.54).
‚úÖ Scene setup complete!

üé¨ Running simulation for 15 time steps...
  -> üñºÔ∏è  Generating frame 0... Visibility Scores: {0: 6, 1: 13}
  -> üñºÔ∏è  Generating frame 1... Visibility Scores: {0: 8, 1: 7}
  -> üñºÔ∏è  Generating frame 2... Visibility Scores: {0: 14, 1: 5}
  -> üñºÔ∏è  Generating frame 3... Visibility Scores: {0: 13, 1: 6}
  -> üñºÔ∏è  Generating frame 4... Visibility Scores: {0: 9, 1: 22}
  -> üñºÔ∏è  Generating frame 5... Visibility Scores: {0: 7, 1: 22}
  -> üñºÔ∏è  Generating frame 6... Visibility Scores: {0: 12, 1: 7}
  -> üñºÔ∏è  Generating frame 7... Visibility Scores

In [4]:
!pip install opencv-python-headless


Collecting opencv-python-headless
  Downloading opencv_python_headless-4.12.0.88-cp37-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (19 kB)
Downloading opencv_python_headless-4.12.0.88-cp37-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (54.0 MB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m54.0/54.0 MB[0m [31m51.8 MB/s[0m  [33m0:00:01[0m6m0:00:01[0m00:01[0m
[?25hInstalling collected packages: opencv-python-headless
Successfully installed opencv-python-headless-4.12.0.88


In [5]:
## Make the images from simulation_centered_text into a video with 1 second per image. 

import os
import cv2
import glob

def create_video_from_images(image_folder, output_video_path, fps=1):
    """
    Creates a video from a sequence of images in a folder.

    Args:
        image_folder (str): Path to the folder containing the images.
        output_video_path (str): Path to save the output video file (e.g., 'simulation.mp4').
        fps (int): Frames per second for the video.
    """
    print(f"üé¨ Creating video from images in '{image_folder}'...")
    
    # Get all image files and sort them naturally to handle numbers correctly
    # (e.g., timestep_1.png, timestep_2.png, ..., timestep_10.png)
    images = sorted(glob.glob(os.path.join(image_folder, '*.png')), 
                    key=lambda x: int(os.path.basename(x).split('_')[1].split('.')[0]))

    if not images:
        print(f"‚ö†Ô∏è No .png images found in the directory '{image_folder}'. Video creation aborted.")
        return

    # Read the first image to get the frame size
    frame = cv2.imread(images[0])
    height, width, layers = frame.shape
    size = (width, height)

    # Initialize the video writer
    # The 'mp4v' codec is a good choice for .mp4 files.
    fourcc = cv2.VideoWriter_fourcc(*'mp4v') 
    out = cv2.VideoWriter(output_video_path, fourcc, fps, size)

    # Loop through all images and write them to the video
    for image_path in images:
        frame = cv2.imread(image_path)
        out.write(frame)

    # Release the video writer to save the file
    out.release()
    print(f"‚úÖ Video saved successfully to '{output_video_path}'")
    
# --- Configuration ---
# Make sure this matches the output directory from your simulation
IMAGE_DIRECTORY = "simulation_centered_text" 
VIDEO_FILENAME = "simulation_video.mp4"
VIDEO_FPS = 1 # 1 frame per second

# --- Call the function to create the video ---
create_video_from_images(
    image_folder=IMAGE_DIRECTORY, 
    output_video_path=VIDEO_FILENAME, 
    fps=VIDEO_FPS
)

üé¨ Creating video from images in 'simulation_centered_text'...
‚úÖ Video saved successfully to 'simulation_video.mp4'
