In [1]:
!pip install ipywidgets -qqq

In [2]:
import os
import re
from glob import glob
from pathlib import Path
import numpy as np
import pandas as pd
from PIL import Image
import io

# Optional for saving mp4
import cv2

# Optional for Jupyter widget
from IPython.display import HTML, display
import base64
import ipywidgets as widgets

# --- конфигурация цветов (RGBA 0-255) ---
COLOR_MAP = {
    'bud': (0, 0, 255, 255),        # синий
    'producer': (0, 200, 0, 255),   # зелёный
    'conductor': (139, 69, 19, 255) # коричневый
}
EMPTY_COLOR = (255, 255, 255, 255)  # белый

# --- читаем размеры ---
def read_meta(meta_path='save_meta.txt'):
    s = Path(meta_path).read_text().strip()
    h, w = [int(x) for x in re.split(r'\s*,\s*', s)]
    return h, w

# --- список snap файлов, сортировка по индексу ---
def list_snap_files(saves_dir='saves', pattern='snap_*.csv'):
    files = glob(os.path.join(saves_dir, pattern))
    # извлечь индекс из имени snap_{i}.csv
    def idx(path):
        m = re.search(r'snap_(\d+)\.csv$', path)
        return int(m.group(1)) if m else float('inf')
    return sorted(files, key=idx)

# --- парсинг одного CSV в массив (height,width,4) RGBA ---
def csv_to_frame(csv_path, height, width, color_map=COLOR_MAP, empty_color=EMPTY_COLOR):
    # создаём белый фон
    img = np.zeros((height, width, 4), dtype=np.uint8)
    img[:, :] = empty_color
    # читаем csv: предполагаем без заголовка x,y,kind
    # допускаем пробелы, разные разделители — pandas справится
    df = pd.read_csv(csv_path, header=None, names=['x','y','kind'])
    # в файле x,y — предполагаем координаты в пределах width,height
    for _, row in df.iterrows():
        x = int(row['x'])
        y = int(row['y'])
        kind = str(row['kind']).strip()
        if 0 <= y < height and 0 <= x < width:
            color = color_map.get(kind, empty_color)
            img[y, x] = color
    if not img.any():
        return None
    return img

# --- собрать все кадры ---
def build_frames(saves_dir='saves', meta_path='save_meta.txt'):
    h, w = read_meta(meta_path)
    files = list_snap_files(saves_dir)
    frames = []
    for p in files:
        frame = csv_to_frame(p, h, w)
        if frame is None:
            break
        frames.append(frame)
    return frames  # список numpy массивов RGBA

# --- сохранить frames как GIF ---
def save_gif(frames, out_path='out.gif', duration_ms=200):
    pil_frames = [Image.fromarray(f) for f in frames]
    pil_frames[0].save(out_path, save_all=True, append_images=pil_frames[1:],
                       duration=duration_ms, loop=0)
    return out_path

# --- сохранить как mp4 (опционально) ---
def save_mp4(frames, out_path='out.mp4', fps=10):
    # frames: list of RGBA numpy arrays; convert to BGR for cv2 and drop alpha
    h, w = frames[0].shape[:2]
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    writer = cv2.VideoWriter(out_path, fourcc, fps, (w, h))
    for f in frames:
        bgr = cv2.cvtColor(f, cv2.COLOR_RGBA2BGR)
        writer.write(bgr)
    writer.release()
    return out_path

# --- интерактивный плеер для Jupyter: слайдер + autoplay ---
def interactive_player(frames, fps_default=5):
    # конвертируем кадр в PNG data-uri для быстрых переключений в браузере
    png_data = []
    for f in frames:
        pil = Image.fromarray(f)
        buf = io.BytesIO()
        pil.save(buf, format='PNG')
        b64 = base64.b64encode(buf.getvalue()).decode('ascii')
        png_data.append(f"data:image/png;base64,{b64}")

    slider = widgets.IntSlider(min=0, max=len(frames)-1, step=1, value=0, description='Frame')
    img_widget = widgets.Image(value=base64.b64decode(png_data[0].split(',',1)[1]), format='png')
    # play widget
    play = widgets.Play(value=0, min=0, max=len(frames)-1, step=1, interval=1000//fps_default)
    widgets.jslink((play, 'value'), (slider, 'value'))

    def on_slider_change(change):
        idx = change['new']
        img_widget.value = base64.b64decode(png_data[idx].split(',',1)[1])

    slider.observe(on_slider_change, names='value')

    controls = widgets.HBox([play, slider])
    display(widgets.VBox([img_widget, controls]))
    return {'image_widget': img_widget, 'play': play, 'slider': slider}

# --- объединяющая функция, которую нужно вызвать ---
def make_interactive_from_saves(saves_dir='saves', meta_path='save_meta.txt', save_gif_path=None, save_mp4_path=None, fps=5):
    frames = build_frames(saves_dir, meta_path)
    if len(frames) == 0:
        raise RuntimeError("No frames found in saves directory.")
    # показать интерактивный контрол в Jupyter
    # player = interactive_player(frames, fps_default=fps)
    # опционально сохранить
    outputs = {}
    if save_gif_path:
        outputs['gif'] = save_gif(frames, save_gif_path, duration_ms=int(1000/fps))
    if save_mp4_path:
        outputs['mp4'] = save_mp4(frames, save_mp4_path, fps=fps)
    return {'frames': frames, 'player_widgets': player, **outputs}


In [3]:
import os
import re
from glob import glob
from pathlib import Path
import numpy as np
import pandas as pd
import cv2

# --- конфигурация цветов (RGB 0-255) ---
COLOR_MAP = {
    'bud': (0, 0, 255),        # синий
    'producer': (0, 200, 0),   # зелёный
    'conductor': (139, 69, 19) # коричневый
}
EMPTY_COLOR = (255, 255, 255)  # белый

# --- читаем размеры (height,width) ---
def read_meta(meta_path='save_meta.txt'):
    s = Path(meta_path).read_text().strip()
    h, w = [int(x) for x in re.split(r'\s*,\s*', s)]
    if h <= 0 or w <= 0:
        raise ValueError("Invalid meta sizes")
    return h, w

# --- список snap файлов, сортировка по индексу ---
def list_snap_files(saves_dir='saves', pattern='snap_*.csv'):
    files = glob(os.path.join(saves_dir, pattern))
    def idx(path):
        m = re.search(r'snap_(\d+)\.csv$', os.path.basename(path))
        return int(m.group(1)) if m else float('inf')
    return sorted(files, key=idx)

# --- создать пустой кадр RGB ---
def make_empty_frame(height, width, empty_color=EMPTY_COLOR):
    # contiguous uint8 array (height, width, 3)
    return np.full((height, width, 3), empty_color, dtype=np.uint8)

# --- парсинг одного CSV в существующий массив (inplace) ---
def fill_frame_from_csv_into(arr, csv_path, color_map=COLOR_MAP):
    # arr: numpy array (H,W,3) uint8 — изменяется на месте
    # Читаем CSV в память, но только три колонки; ожидаем много строк, поэтому задаём dtype
    try:
        df = pd.read_csv(csv_path, header=None, usecols=[0,1,2], names=['x','y','kind'],
                         dtype={'x': np.int32, 'y': np.int32, 'kind': object}, na_filter=False)
    except Exception:
        return 0  # ничего не записано
    if df.empty:
        return 0
    # фильтруем координаты в рамках
    h, w = arr.shape[:2]
    xs = df['x'].to_numpy(np.int32)
    ys = df['y'].to_numpy(np.int32)
    kinds = df['kind'].to_numpy(dtype=object)

    valid = (xs >= 0) & (xs < w) & (ys >= 0) & (ys < h)
    if not np.any(valid):
        return 0
    xs = xs[valid]; ys = ys[valid]; kinds = kinds[valid]

    # группируем по kind для минимизации обращений Python
    unique_kinds, inv = np.unique(kinds, return_inverse=True)
    count = 0
    for ki, kind in enumerate(unique_kinds):
        color = color_map.get(kind, None)
        if color is None:
            continue
        mask = (inv == ki)
        xs_k = xs[mask]; ys_k = ys[mask]
        arr[ys_k, xs_k] = color  # numpy advanced indexing (vectorized)
        count += xs_k.size
    return count

# --- собрать и вернуть список путей (без чтения всех файлов в память) ---
def build_snap_paths(saves_dir='saves', meta_path='save_meta.txt'):
    _ = read_meta(meta_path)  # validate meta
    files = list_snap_files(saves_dir)
    return files

# --- сохранить все кадры в mp4 потоком (не держа все в памяти) ---
def write_mp4_stream(snap_paths, out_path='out.mp4', meta_path='save_meta.txt', fps=10, color_map=COLOR_MAP):
    from tqdm import tqdm
    h, w = read_meta(meta_path)
    if len(snap_paths) == 0:
        raise RuntimeError("No snap files")
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    writer = cv2.VideoWriter(out_path, fourcc, float(fps), (w, h))
    if not writer.isOpened():
        raise RuntimeError("cv2.VideoWriter failed to open")

    frame = make_empty_frame(h, w)  # reuse one numpy array for speed
    any_written = False
    for p in tqdm(snap_paths):
        # reset to empty (fast)
        frame[:] = EMPTY_COLOR
        n = fill_frame_from_csv_into(frame, p, color_map=color_map)
        if n == 0:
            # записываем пустой кадр тоже, если это нужно; если нет — пропускаем
            writer.write(frame[..., ::-1])  # RGB->BGR
            continue
        writer.write(frame[..., ::-1])
        any_written = True

    writer.release()
    return out_path

# --- опционально: получить все кадры в память (экономно если их немного) ---
def load_frames_to_list(snap_paths, meta_path='save_meta.txt', color_map=COLOR_MAP):
    from tqdm import tqdm
    h, w = read_meta(meta_path)
    frames = []
    frame = make_empty_frame(h, w)
    for p in tqdm(snap_paths):
        frame[:] = EMPTY_COLOR
        fill_frame_from_csv_into(frame, p, color_map=color_map)
        frames.append(frame.copy())  # копия необходима при переиспользовании
    return frames

# --- объединённая функция: либо stream->mp4, либо вернуть пути/фреймы ---
def process_saves(saves_dir='saves', meta_path='save_meta.txt', out_mp4_path=None, fps=10, return_frames=False):
    snap_paths = build_snap_paths(saves_dir, meta_path)
    if out_mp4_path:
        mp4 = write_mp4_stream(snap_paths, out_path=out_mp4_path, meta_path=meta_path, fps=fps)
    else:
        mp4 = None
    frames = None
    if return_frames:
        frames = load_frames_to_list(snap_paths, meta_path=meta_path)
    return {'snap_paths': snap_paths, 'mp4': mp4, 'frames': frames}


In [5]:
# make_interactive_from_saves(saves_dir='../saves', meta_path='../saves/snap_meta.txt', save_gif_path='out.gif', save_mp4_path='out.mp4', fps=10);
process_saves(saves_dir='../saves', meta_path='../saves/snap_meta.txt', fps=10, out_mp4_path='out.mp4');

100%|██████████| 10000/10000 [06:34<00:00, 25.33it/s]
