In [None]:
# Продолжайте развивать генератор данных.
# Решите задачу трекинга объектов.
# Этап 1. Генератор последовательности кадров с потоком клеток, перемещающихся "слева - направо".
#  - сгенерировать последовательность кадров в 10 секунд (итого - примерно 240 кдаров)
# - первичное положение клеток - случайный пиксель
# - при достижении границы изображения объект прекращает свое существование
# - пусть генератор пути клеток позволяет перемещать клетку по синусоиде  y(x(t) ) = ax+b + c * sin( omega *x) + eps_noise
# - a,b,c omega - случайно сгенерированные значения для данной клетки  eps_noise - случайное значение для каждой секунды
# - a,b,c omega - случайно сгенерированные значения для данной клетки
# - 10 секунду - 10 отсчетов времени t,  положение между кажром 0 и кадром 24 интерполируется при помощи линейной интерполяции по формуле:
# - выполнить то же самое, только зависимость не T(z),как в примере, а y(x)

In [None]:
# Этап 2. Трекинг объектов.
# Используя пример, приведенный в лекции про трекинг, решить задачу трекинга объектов
# Пути (положения) объктов на каждом кадре сохранить в файл.
# Визуализировать траектории объектов на изображении.

In [None]:
import os
import random
import cv2
import numpy as np
import math
from tqdm import tqdm
import matplotlib.pyplot as plt

class CellAnimationGenerator:
    def __init__(self):
        self.background_images = []
        self.cell_images = []

    def load_assets(self, bg_folder="background", cell_folder="cells"):
        def load_img_folder(folder):
            return [cv2.imread(os.path.join(folder, f))
                   for f in os.listdir(folder)
                   if f.lower().endswith(('.png', '.jpg', '.jpeg'))]

        self.background_images = load_img_folder(bg_folder)
        self.cell_images = load_img_folder(cell_folder)

        if not self.background_images or not self.cell_images:
            raise ValueError("Missing image assets!")

    def create_composite_background(self, size=(450, 450)):
        composite = np.zeros((*size, 3), dtype=np.uint8)
        tile_size = size[0] // 3

        for i in range(0, size[0], tile_size):
            for j in range(0, size[1], tile_size):
                if self.background_images:
                    tile = random.choice(self.background_images)
                    composite[i:i+tile_size, j:j+tile_size] = cv2.resize(tile, (tile_size, tile_size))
        return composite

    def render_cell(self, canvas, cell_img, position, blur_radius=21):
        x, y = position
        h, w = canvas.shape[:2]
        cell_h, cell_w = cell_img.shape[:2]

        # Calculate overlapping regions
        canvas_x1, canvas_x2 = max(0, x), min(w, x + cell_w)
        canvas_y1, canvas_y2 = max(0, y), min(h, y + cell_h)

        cell_x1 = max(0, -x)
        cell_x2 = cell_w - max(0, x + cell_w - w)
        cell_y1 = max(0, -y)
        cell_y2 = cell_h - max(0, y + cell_h - h)

        # Create alpha mask
        mask = np.zeros((cell_y2-cell_y1, cell_x2-cell_x1), dtype=np.float32)
        center = ((cell_x2-cell_x1)//2, (cell_y2-cell_y1)//2)
        radius = min(center[0], center[1])
        cv2.circle(mask, center, radius, 1, -1)
        mask = cv2.GaussianBlur(mask, (blur_radius, blur_radius), 0)[..., None]

        # Blend images
        canvas[canvas_y1:canvas_y2, canvas_x1:canvas_x2] = (
            canvas[canvas_y1:canvas_y2, canvas_x1:canvas_x2] * (1 - mask) +
            cell_img[cell_y1:cell_y2, cell_x1:cell_x2] * mask
        ).astype(np.uint8)

        return canvas

class CellMotionSimulator:
    def __init__(self, canvas_size):
        self.width, self.height = canvas_size
        self.phase_offset = random.uniform(0, math.pi*2)
        self.wave_frequency = random.uniform(0.03, 0.08)
        self.velocity = random.uniform(8, 12)
        self.amplitude = random.uniform(10, 30)
        self.noise = {
            'phase': random.uniform(0, math.pi*2),
            'frequency': random.uniform(0.5, 2.0),
            'amplitude': random.uniform(1, 3)
        }

        self.x = random.randint(-50, 20)
        self.y = random.randint(0, self.height)
        self.origin_y = self.y

    def update_position(self, delta_time):
        self.x += self.velocity * delta_time

        noise_component = math.sin(self.noise['phase']) * self.noise['amplitude']
        self.noise['phase'] += self.noise['frequency'] * delta_time

        wave_component = self.amplitude * math.sin(self.wave_frequency * self.x)

        self.y = self.origin_y + wave_component + noise_component
        self.y = np.clip(self.y, 0, self.height - 1)

        return self.x < self.width + 100  # Check if still visible

class CellTracker:
    def __init__(self):
        self.trajectories = {}
        self.current_id = 0
        self.colors = np.random.randint(0, 255, (1000, 3))

    def process_detections(self, current_detections):
        if not self.trajectories:
            for det in current_detections:
                self._init_track(det)
            return self.trajectories

        matched = set()
        active_tracks = {}

        for track_id, track in self.trajectories.items():
            last_pos = track['path'][-1]
            closest = self._find_closest(last_pos, current_detections, matched)

            if closest is not None:
                matched.add(closest)
                track['path'].append(self._get_center(current_detections[closest]))
                track['missed'] = 0
                active_tracks[track_id] = track

        # Handle new detections
        for i, det in enumerate(current_detections):
            if i not in matched:
                self._init_track(det)

        return self.trajectories

    def _init_track(self, detection):
        self.trajectories[self.current_id] = {
            'path': [self._get_center(detection)],
            'missed': 0
        }
        self.current_id += 1

    def _find_closest(self, reference, detections, excluded):
        min_dist, best_match = float('inf'), None

        for i, det in enumerate(detections):
            if i in excluded:
                continue

            dist = np.linalg.norm(np.array(reference) - np.array(self._get_center(det)))
            if dist < 25 and dist < min_dist:
                min_dist, best_match = dist, i

        return best_match

    def _get_center(self, bbox):
        x, y, w, h = bbox
        return (x + w//2, y + h//2)

def generate_animation(output_file="cell_animation.mp4", duration=10, fps=24):
    generator = CellAnimationGenerator()
    generator.load_assets()

    canvas_size = (450, 450)
    background = generator.create_composite_background(canvas_size)
    video_writer = cv2.VideoWriter(output_file, cv2.VideoWriter_fourcc(*'mp4v'), fps, canvas_size)

    active_cells = []
    total_frames = duration * fps

    for frame_idx in tqdm(range(total_frames), desc="Generating animation"):
        frame = background.copy()

        # Add new cells periodically
        if len(active_cells) < 8 and (frame_idx % 20 == 0 or len(active_cells) < 2):
            cell_img = random.choice(generator.cell_images)
            size = random.randint(40, 80)
            active_cells.append({
                'simulator': CellMotionSimulator(canvas_size),
                'image': cv2.resize(cell_img, (size, size))
            })

        # Update and render cells
        for cell in active_cells[:]:
            if not cell['simulator'].update_position(1/fps):
                active_cells.remove(cell)
                continue

            position = (int(cell['simulator'].x), int(cell['simulator'].y))
            frame = generator.render_cell(frame, cell['image'], position)

        video_writer.write(frame)

    video_writer.release()
    print(f"Animation saved to {output_file}")

def analyze_cell_movement(video_path):
    capture = cv2.VideoCapture(video_path)
    tracker = CellTracker()
    processed_frames = []

    while capture.isOpened():
        success, frame = capture.read()
        if not success:
            break

        # Detection and tracking
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        blurred = cv2.GaussianBlur(gray, (9, 9), 0)
        threshold = cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                        cv2.THRESH_BINARY_INV, 11, 2)

        contours, _ = cv2.findContours(threshold, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        detections = []

        for contour in contours:
            area = cv2.contourArea(contour)
            if 100 < area < 3000:
                perimeter = cv2.arcLength(contour, True)
                if perimeter > 0:
                    circularity = 4 * np.pi * area / (perimeter ** 2)
                    if 0.7 < circularity < 1.3:
                        x, y, w, h = cv2.boundingRect(contour)
                        detections.append((x, y, w, h))

        tracker.process_detections(detections)

        # Visualization
        display = frame.copy()
        for track_id, data in tracker.trajectories.items():
            color = tracker.colors[track_id % len(tracker.colors)].tolist()
            path = data['path']

            for i in range(1, len(path)):
                cv2.line(display, path[i-1], path[i], color, 2)

            if path:
                cv2.circle(display, path[-1], 5, color, -1)
                cv2.putText(display, str(track_id), (path[-1][0]+10, path[-1][1]),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)

        processed_frames.append(display)

    capture.release()

    # Save results
    os.makedirs("analysis_results", exist_ok=True)

    # Save trajectory data
    with open("analysis_results/trajectories.csv", "w") as f:
        f.write("track_id,frame,x,y\n")
        for track_id, data in tracker.trajectories.items():
            for frame_num, (x, y) in enumerate(data['path']):
                f.write(f"{track_id},{frame_num},{x},{y}\n")

    # Visualize trajectories
    plt.figure(figsize=(10, 10))
    for track_id, data in tracker.trajectories.items():
        if len(data['path']) > 10:
            xs, ys = zip(*data['path'])
            plt.plot(xs, ys, label=f'Cell {track_id}')

    plt.gca().invert_yaxis()
    plt.legend()
    plt.title("Cell Movement Analysis")
    plt.savefig("analysis_results/trajectories_plot.png")
    plt.close()

    print("Analysis complete. Results saved in 'analysis_results' folder.")

if __name__ == "__main__":
    generate_animation()

    analyze_cell_movement("cell_animation.mp4")

Generating animation: 100%|██████████| 240/240 [00:01<00:00, 203.50it/s]


Animation saved to cell_animation.mp4
Analysis complete. Results saved in 'analysis_results' folder.
