# **N11. Time analysis**

In [1]:
import numpy as np
import pandas as pd
import libraries.utilities as ut
import libraries.starfunctions as sf
from datetime import datetime
import os
from astropy.io import fits

In [2]:
def average_timestamp(beg_time, end_time):
    """
    Take the average time of observation and convert into UTC.
    """
    time_avg = (end_time + beg_time) / 2
    time_avg = sf.et2utc(time_avg)
    return datetime.strptime(time_avg, "%Y %b %d %H:%M:%S.%f")

In [3]:
# Load kernel.
KERNEL_PATH = "/home/stefano98/University of Padua/thesis/Packages/Solar-orbiter/kernels/mk/"
KERNEL_NAME = "solo_ANC_soc-flown-mk.tm"
spice = sf.load_kernel(KERNEL_NAME, KERNEL_PATH)

# Load data.
fits_headers = pd.DataFrame(pd.read_pickle("resources/metis_all25022025151324.pkl"))
fits_headers = fits_headers[fits_headers["filter"] == "VL"].reset_index(drop = True)

# Convert time.
fits_headers["UTC_TIME"] = fits_headers.apply(lambda x: average_timestamp(x["obt_end"], x["obt_beg"]), axis = 1)

In [4]:
# Convert time into UTC.
fits_headers = fits_headers.sort_values("UTC_TIME").reset_index(drop=True)
fits_headers["DIFF_TIME"] = fits_headers["UTC_TIME"].shift(-1) - fits_headers["UTC_TIME"]
fits_headers["DIFF_TIME"] = fits_headers["DIFF_TIME"].apply(lambda x: x.total_seconds())

# Group similar time values.
tol = 0.5
fits_headers["diff_bin"] = (np.round(fits_headers["DIFF_TIME"] / tol) * tol).astype(float)

In [5]:
counts = fits_headers.groupby(["stp", "diff_bin"]).size().reset_index(name = "count")

top_per_stp = counts.loc[counts.groupby("stp")["count"].idxmax()] \
                    .sort_values("count", ascending=False) \
                    .reset_index(drop=True)

In [6]:
test_stp = counts.sort_values("count", ascending= False).head(100).sort_values("diff_bin", ascending= False)["stp"].iloc[0]

In [7]:
links = fits_headers[fits_headers["stp"] == test_stp]["url"].to_list()
FOLDER = "resources/fits images"

In [8]:
def download_files(url_list, folder_output):
    """
    Download a set of files and save in a folder.
    """
    failed_downloads = []

    for i in range(len(url_list)):
        try:
            ut.download_fits(url_list[i], str(i)+".fits", folder_output)
        except:
            failed_downloads.append(i)

    print("Download completed!")
    
    return failed_downloads

In [9]:
images = os.listdir(FOLDER)

fits_images = []

for i in range(len(images)):
    img = fits.open(os.path.join(FOLDER, images[i]))[0].data 
    fits_images.append(img)

In [10]:
#download_files(links, FOLDER)

In [11]:
import numpy as np
import imageio.v3 as iio
from itertools import tee
from typing import Iterable, Tuple, Optional, Union

ArrayLike = Union[np.ndarray, "np.typing.ArrayLike"]

def _to_uint8(frame: ArrayLike,
              vmin: Optional[float] = None,
              vmax: Optional[float] = None,
              percent: Tuple[float, float] = (1, 99),
              log: bool = False) -> np.ndarray:
    """
    Scale a single 2D float/uint image to 8-bit for video.
    """
    x = np.asarray(frame)
    if x.ndim != 2:
        raise ValueError(f"Each frame must be 2D (H×W). Got shape {x.shape}.")

    # Use float32 to save memory if needed
    if x.dtype.kind == "f" and x.dtype.itemsize > 4:
        x = x.astype(np.float32, copy=False)

    # Optional logarithmic compression for high dynamic range images
    if log:
        # protect against negatives
        x = np.log1p(np.maximum(x, 0))

    # Determine range
    if vmin is None or vmax is None:
        lo, hi = np.percentile(x, percent)
        vmin = lo if vmin is None else vmin
        vmax = hi if vmax is None else vmax
        if vmax <= vmin:
            vmax = vmin + 1e-9

    # Normalize to [0, 255]
    y = np.clip((x - vmin) / (vmax - vmin), 0, 1)
    return (y * 255).astype(np.uint8)


def _open_video_writer(path: str, fps: int, bitrate: str):
    """
    Try ffmpeg plugin first; if missing, try pyav. Raise a helpful error otherwise.
    """
    # Try ffmpeg
    try:
        return iio.imopen(
            path, "w", plugin="ffmpeg",
            fps=fps, codec="libx264", pix_fmt="yuv420p", bitrate=bitrate
        )
    except ValueError:
        pass  # ffmpeg plugin not registered

    # Try pyav
    try:
        # codec/pix_fmt can be auto-chosen by backend; keep minimal args for portability
        return iio.imopen(path, "w", plugin="pyav", fps=fps)
    except Exception as e:
        raise RuntimeError(
            "No video backend available for imageio. "
            "Install one of:\n"
            "  pip install imageio-ffmpeg  (recommended)\n"
            "  pip install av              (PyAV alternative)\n"
            "and restart the Python kernel."
        ) from e


def make_movie(frames: Iterable[ArrayLike],
               out_path: str = "movie.mp4",
               fps: int = 15,
               percent: Tuple[float, float] = (1, 99),
               vmin: Optional[float] = None,
               vmax: Optional[float] = None,
               log: bool = False,
               downscale: Optional[Tuple[int, int]] = None,
               bitrate: str = "8M") -> str:
    """
    Build a video (H.264 .mp4) or an animated GIF from an iterable of 2D arrays.

    Parameters
    ----------
    frames : iterable of 2D numpy arrays (H×W)
    out_path : output path; ends with .mp4 or .gif
    fps : frames per second
    percent : percentile clip (lo, hi) used when vmin/vmax not given
    vmin, vmax : explicit value range; if None, computed from percentiles
    log : apply log1p() before scaling (useful for high dynamic range)
    downscale : (new_h, new_w) nearest-neighbor downscale or None
    bitrate : video bitrate (e.g., '8M', '4M') for mp4 writers

    Returns
    -------
    out_path : str
        The path written.
    """
    is_gif = out_path.lower().endswith(".gif")

    # Allow generator input; we need two passes if vmin/vmax not provided
    it1, it2 = tee(iter(frames), 2)

    # Compute default vmin/vmax if needed (single pass over frames)
    if vmin is None or vmax is None:
        lows, highs = [], []
        for f in it1:
            x = np.asarray(f)
            if x.ndim != 2:
                raise ValueError(f"Each frame must be 2D (H×W). Got shape {x.shape}.")
            if x.dtype.kind == "f" and x.dtype.itemsize > 4:
                x = x.astype(np.float32, copy=False)
            if log:
                x = np.log1p(np.maximum(x, 0))
            lo, hi = np.percentile(x, percent)
            lows.append(lo); highs.append(hi)
        if not lows:
            raise ValueError("No frames provided.")
        vmin = np.mean(lows) if vmin is None else vmin
        vmax = np.mean(highs) if vmax is None else vmax
        if vmax <= vmin:
            vmax = vmin + 1e-9

    # Prepare writer/collector
    writer = None
    if is_gif:
        rgb_frames = []
    else:
        writer = _open_video_writer(out_path, fps=fps, bitrate=bitrate)

    try:
        for f in it2:
            g = _to_uint8(f, vmin=vmin, vmax=vmax, percent=percent, log=log)

            if downscale is not None:
                new_h, new_w = downscale
                yi = (np.linspace(0, g.shape[0] - 1, new_h)).astype(int)
                xi = (np.linspace(0, g.shape[1] - 1, new_w)).astype(int)
                g = g[np.ix_(yi, xi)]

            # Expand grayscale to RGB for broad codec compatibility
            rgb = np.repeat(g[..., None], 3, axis=-1)

            if is_gif:
                rgb_frames.append(rgb)
            else:
                writer.write(rgb)

        if is_gif:
            # duration per frame in seconds
            iio.imwrite(out_path, rgb_frames, duration=1.0 / fps, loop=0)

    finally:
        if writer is not None:
            writer.close()

    return out_path


In [12]:
make_movie(fits_images, out_path="metis_corona.mp4", fps=12, log=True, downscale=(1024,1024))

RuntimeError: No video backend available for imageio. Install one of:
  pip install imageio-ffmpeg  (recommended)
  pip install av              (PyAV alternative)
and restart the Python kernel.