In [1]:
import cv2
import glob 

import numpy as np  

In [2]:
DESTINATION_FOLDER = ".\\processed_videos_2"
SOURCE_FOLDER = ".\\raw_data"

video_files = (
    glob.glob(SOURCE_FOLDER + "\\*\\?.mp4") +
    glob.glob(SOURCE_FOLDER + "\\*\\??.mp4")
)

In [3]:
video_files

['.\\raw_data\\1_evening\\1.mp4',
 '.\\raw_data\\2_evening\\2.mp4',
 '.\\raw_data\\3_evening\\3.mp4',
 '.\\raw_data\\4_evening\\4.mp4',
 '.\\raw_data\\5_evening\\5.mp4',
 '.\\raw_data\\6_day\\6.mp4',
 '.\\raw_data\\7_day\\7.mp4',
 '.\\raw_data\\8_day\\8.mp4',
 '.\\raw_data\\9_day\\9.mp4',
 '.\\raw_data\\10_evening\\10.mp4',
 '.\\raw_data\\11_day\\11.mp4',
 '.\\raw_data\\12_day\\12.mp4']

In [4]:
MORSE = {
    "A": ".-",    "B": "-...",  "C": "-.-.",  "D": "-..",   "E": ".",
    "F": "..-.",  "G": "--.",   "H": "....",  "I": "..",    "J": ".---",
    "K": "-.-",   "L": ".-..",  "M": "--",    "N": "-.",    "O": "---",
    "P": ".--.",  "Q": "--.-",  "R": ".-.",   "S": "...",   "T": "-",
    "U": "..-",   "V": "...-",  "W": ".--",   "X": "-..-",  "Y": "-.--",
    "Z": "--..",
    "0": "-----", "1": ".----", "2": "..---", "3": "...--", "4": "....-",
    "5": ".....", "6": "-....", "7": "--...", "8": "---..", "9": "----.",
}

In [5]:
def _dot_seconds(wpm: float) -> float:
    return 1.2 / float(wpm)

def text_to_morse_timeline_frames(text: str, wpm: float, fps: float, noise: float, rng=None):
    rng = np.random.default_rng() if rng is None else rng

    unit_s = _dot_seconds(wpm)
    unit_f = max(1, int(round(unit_s * fps)))

    def jitter():
        # noise is in "units" (e.g. 0.25 means +/-0.25 unit)
        return int(round(rng.uniform(-noise, noise) * unit_f))

    dot_f = unit_f
    dash_f = 3 * unit_f

    out = []
    text = text.upper().strip()
    words = [w for w in text.split(" ") if w]

    for wi, word in enumerate(words):
        if wi > 0:
            out.append({"state": "off", "frames": max(1, 7*unit_f + jitter()), "kind": "word_gap"})

        for li, ch in enumerate(word):
            code = MORSE.get(ch)
            if code is None:
                out.append({"state": "off", "frames": 1, "kind": "unknown"})
                continue

            if li > 0:
                out.append({"state": "off", "frames": max(1, 3*unit_f + jitter()), "kind": "letter_gap"})

            for si, sym in enumerate(code):
                if si > 0:
                    out.append({"state": "off", "frames": max(1, 1*unit_f + jitter()), "kind": "intra"})

                if sym == ".":
                    out.append({"state": "on", "frames": max(1, dot_f + jitter()), "kind": "dot"})
                else:
                    out.append({"state": "on", "frames": max(1, dash_f + jitter()), "kind": "dash"})

    return out, unit_f

def multiscale_match(frame, template, scales=np.linspace(0.8,1.2,15)):
    th, tw = template.shape[:2]
    best = {'val': -1, 'loc': None, 'scale': None, 'w':None, 'h':None}
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    tpl_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)

    for s in scales:
        nw, nh = int(tw * s), int(th * s)
        if nw < 8 or nh < 8 or nw > frame.shape[1] or nh > frame.shape[0]:
            continue
        tpl_resized = cv2.resize(tpl_gray, (nw, nh), interpolation=cv2.INTER_AREA)
        res = cv2.matchTemplate(gray, tpl_resized, cv2.TM_CCOEFF_NORMED)
        _, maxVal, _, maxLoc = cv2.minMaxLoc(res)
        if maxVal > best['val']:
            best.update({'val': maxVal, 'loc': maxLoc, 'scale': s, 'w': nw, 'h': nh})

    if best['loc'] is None:
        return None
    x, y = best['loc']
    return (x, y, best['w'], best['h'], best['val'])


def add_flashlight_inv_r2_blur_alpha(
    frame, x, y,
    radius=200,
    strength=1.0,        # overall intensity
    epsilon=20.0,
    blur_ksize=0,
    color_bgr=(0, 0, 255),
    alpha_max=0.9        # max replacement amount at the center
):
    h, w = frame.shape[:2]
    yy, xx = np.ogrid[:h, :w]
    dx = xx - x
    dy = yy - y
    r2 = dx*dx + dy*dy
    R2 = float(radius * radius)

    inv = 1.0 / (r2.astype(np.float32) + epsilon)
    inv_R = 1.0 / (R2 + epsilon)
    glow = np.clip(inv - inv_R, 0.0, None)

    maxv = (1.0 / epsilon) - inv_R
    if maxv > 0:
        glow /= maxv
    glow[r2 > R2] = 0.0

    if blur_ksize and blur_ksize > 1:
        if blur_ksize % 2 == 0:
            blur_ksize += 1
        glow = cv2.GaussianBlur(glow, (blur_ksize, blur_ksize), 0)

    # alpha mask: stronger near center
    alpha = np.clip(glow * strength, 0.0, 1.0) * alpha_max  # (H,W)
    alpha3 = alpha[..., None]

    frame_f = frame.astype(np.float32)
    color = np.array(color_bgr, dtype=np.float32).reshape(1, 1, 3)

    # Composite: out = (1-alpha)*frame + alpha*color
    out = frame_f * (1.0 - alpha3) + color * alpha3
    return np.clip(out, 0, 255).astype(np.uint8)


def add_flashlight_inv_r2_blur_color(
    frame, x, y,
    radius=140,
    strength=1.2,
    epsilon=9.0,
    blur_ksize=41,
    color_bgr=(0, 255, 255)  # yellow
):
    h, w = frame.shape[:2]

    yy, xx = np.ogrid[:h, :w]
    dx = xx - x
    dy = yy - y
    r2 = dx*dx + dy*dy
    R2 = float(radius * radius)

    inv = 1.0 / (r2.astype(np.float32) + epsilon)
    inv_R = 1.0 / (R2 + epsilon)
    glow = np.clip(inv - inv_R, 0.0, None)

    maxv = (1.0 / epsilon) - inv_R
    if maxv > 0:
        glow /= maxv

    glow[r2 > R2] = 0.0

    if blur_ksize % 2 == 0:
        blur_ksize += 1
    glow = cv2.GaussianBlur(glow, (blur_ksize, blur_ksize), 0)

    # Colorize glow
    color = np.array(color_bgr, dtype=np.float32).reshape(1, 1, 3)
    glow3 = glow[..., None] * color * strength

    out = frame.astype(np.float32) + glow3
    return np.clip(out, 0, 255).astype(np.uint8)


def dim_frame_exponential(frame_bgr, k=1.6, min_factor=0.35, use_luma=True):
    """
    Dim the entire frame using an exponential factor based on brightness.
    - k: stronger -> darker
    - min_factor: floor so it never becomes too dark
    - use_luma: True uses perceived luminance, False uses mean RGB
    """
    f = frame_bgr.astype(np.float32)

    if use_luma:
        b, g, r = cv2.split(f)
        y = 0.114*b + 0.587*g + 0.299*r
        bright01 = float(np.mean(y) / 255.0)
    else:
        bright01 = float(np.mean(f) / 255.0)

    factor = float(np.exp(-k * bright01))
    factor = max(min_factor, factor)

    out = f * factor
    return np.clip(out, 0, 255).astype(np.uint8)

def auto_gain_exposure(frame_bgr, target=0.45, k=2.0, min_gain=0.6, max_gain=1.8):
    """
    Brighten dark scenes and dim bright scenes.
    - target: desired average luminance (0..1)
    - k: how aggressively to correct
    """
    f = frame_bgr.astype(np.float32)

    b, g, r = cv2.split(f)
    y = 0.114*b + 0.587*g + 0.299*r
    mean_y = float(np.mean(y) / 255.0)  # 0..1

    # Exponential gain: if mean_y < target => gain > 1 (brighten)
    # if mean_y > target => gain < 1 (dim)
    gain = float(np.exp(k * (target - mean_y)))
    gain = float(np.clip(gain, min_gain, max_gain))

    out = f * gain
    return np.clip(out, 0, 255).astype(np.uint8)


class MarkerLogger:
    def __init__(self, fps: float, ignore_initial_delay=True, start_at_first_on=True):
        self.fps = float(fps)
        self.ignore_initial_delay = ignore_initial_delay
        self.start_at_first_on = start_at_first_on

        self.started = False
        self.prev = None  # (state, kind)
        self.frames = 0
        self.out = []

    def update(self, flash_on: bool, kind: str):
        if self.ignore_initial_delay and kind == "initial_delay":
            return

        if not self.started:
            if self.start_at_first_on:
                if not flash_on:
                    return
            self.started = True
            self.prev = ("on" if flash_on else "off", kind)
            self.frames = 1
            return

        key = ("on" if flash_on else "off", kind)
        if key == self.prev:
            self.frames += 1
        else:
            self._flush()
            self.prev = key
            self.frames = 1

    def _flush(self):
        if self.prev is None:
            return
        ms = (self.frames / self.fps) * 1000.0
        state, kind = self.prev
        self.out.append((state, ms, kind))

    def finalize(self):
        self._flush()
        return self.out


class RepeatingMorseFlasher:
    def __init__(self, message: str, wpm: float, fps: float,
                 seed=None, initial_delay_s=(0.3, 0.6),
                 fixed_repeat_pause_units=7,
                 noise = 0.2
        ):
        self.fps = float(fps)
        self.rng = np.random.default_rng(seed)

        self.base, self.unit_f = text_to_morse_timeline_frames(message, wpm=wpm, fps=fps, noise=noise, rng=self.rng)
        if not self.base:
            raise ValueError("Morse timeline is empty")

        self.pause_frames_fixed = int(fixed_repeat_pause_units * self.unit_f)

        if initial_delay_s is not None:
            d = float(self.rng.uniform(initial_delay_s[0], initial_delay_s[1]))
            self.initial_remaining = max(1, int(round(d * self.fps)))
        else:
            self.initial_remaining = 0

        self.idx = 0
        self.remaining = max(1, int(self.base[0]["frames"]))
        self.pause_remaining = 0

    def step(self):
        completed_cycle = False

        if self.initial_remaining > 0:
            self.initial_remaining -= 1
            return False, False, "initial_delay"

        if self.pause_remaining > 0:
            self.pause_remaining -= 1
            return False, False, "repeat_pause"

        seg = self.base[self.idx]
        kind = seg["kind"]
        flash_on = (seg["state"] == "on")

        self.remaining -= 1
        if self.remaining <= 0:
            self.idx += 1
            if self.idx >= len(self.base):
                completed_cycle = True
                self.pause_remaining = self.pause_frames_fixed
                self.idx = 0
            self.remaining = max(1, int(self.base[self.idx]["frames"]))

        return flash_on, completed_cycle, kind

In [None]:
for index, video_path in enumerate(video_files):
    template_path = '\\'.join(video_path.split("\\")[:-1]) + "\\template.png"
    time = "day" if "day" in video_path else "night"

    if time == "night":
        random_radius = np.random.randint(30, 35)
        random_strength = np.random.uniform(0.8, 0.9)
        random_epsilon = np.random.uniform(20.0, 25.0)
        random_blur_ksize = np.random.randint(1, 2)
    elif time == "day":
        random_radius = np.random.randint(7, 8)
        random_strength = np.random.uniform(1.0, 1.2)
        random_epsilon = np.random.uniform(40, 50)
        random_blur_ksize = np.random.randint(1, 2)

    color_index = np.random.randint(2)
    random_color = np.array([[100,255,255],[0,0,255]])[color_index]
    random_color_name = "bright yellow" if color_index == 0 else "red"

    morse_index = np.random.randint(4)
    random_morse = {
        0:{"message":"SOS","wpm":6},
        1:{"message":"PARIS","wpm":8},
        2:{"message":"MAYDAY","wpm":8},
        3:{"message":"ONE FATHOM BANK","wpm":10}
    }[morse_index]

    template = cv2.imread(template_path)
    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')     
    # fps = 28                                     
    ret, frame = cap.read()
    height, width = frame.shape[:2]
    fps = cap.get(cv2.CAP_PROP_FPS)
    out = cv2.VideoWriter(DESTINATION_FOLDER + f'\\{index}_{time}_{random_morse["message"]}_{random_morse["wpm"]}wpm_{random_color_name}_nonoise.mp4', fourcc, fps, (width, height))
    delay = max(1, int(1000 / fps))

    x_previous = 0
    y_previous = 0

    flasher = RepeatingMorseFlasher(random_morse["message"], wpm=random_morse["wpm"], fps=fps, fixed_repeat_pause_units=7,noise=0)
    logger = MarkerLogger(fps=fps, ignore_initial_delay=True, start_at_first_on=True)

    cycle_count = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        flash_on, completed, kind = flasher.step()
        logger.update(flash_on, kind)

        if completed:
            cycle_count += 1
            print(f"✅ Morse cycle completed: {cycle_count} in video {index}")

        match = multiscale_match(frame, template, scales=np.linspace(0.5,1.0,15))

        if time == "night":
            frame = dim_frame_exponential(frame, k=8, min_factor=0.01, use_luma=True)
        elif time == "day":
            frame = auto_gain_exposure(frame, target=0.9, k=2.3, min_gain=1.5, max_gain=2.0)
        
        if match:
            
            x, y, w, h, val = match

            if (x_previous == 0 and y_previous == 0) or (abs(x - x_previous) <= 10 and abs(y - y_previous) <= 10):
                x_previous, y_previous = x, y
            else:
                x, y = x_previous, y_previous

            if flash_on:
                if time == "night":
                    frame = add_flashlight_inv_r2_blur_color(
                        frame,
                        x=x+w//2, 
                        y=y+h//2,
                        radius=random_radius,
                        strength=random_strength,
                        epsilon=random_epsilon,
                        blur_ksize=random_blur_ksize,
                        color_bgr=random_color
                    )
                elif time == "day":
                    frame = add_flashlight_inv_r2_blur_alpha(
                        frame,
                        x=x+w//2, 
                        y=y+h//2,
                        radius=random_radius,
                        strength=random_strength,
                        epsilon=random_epsilon,
                        blur_ksize=random_blur_ksize,
                        color_bgr=random_color,
                        alpha_max=0.90
                    )

        out.write(frame)
        cv2.imshow('Frame', frame)
        if cv2.waitKey(delay) & 0xFF == ord('q'):
            break
    cap.release()
    out.release()
    cv2.destroyAllWindows()


✅ Morse cycle completed: 1
✅ Morse cycle completed: 2
✅ Morse cycle completed: 1
✅ Morse cycle completed: 2
✅ Morse cycle completed: 1
✅ Morse cycle completed: 2
✅ Morse cycle completed: 3
✅ Morse cycle completed: 1
✅ Morse cycle completed: 1
✅ Morse cycle completed: 2
✅ Morse cycle completed: 3
✅ Morse cycle completed: 1
✅ Morse cycle completed: 2
✅ Morse cycle completed: 3
✅ Morse cycle completed: 1
✅ Morse cycle completed: 1
✅ Morse cycle completed: 2
✅ Morse cycle completed: 1
✅ Morse cycle completed: 2
✅ Morse cycle completed: 1
✅ Morse cycle completed: 2
✅ Morse cycle completed: 3
✅ Morse cycle completed: 1
✅ Morse cycle completed: 2
✅ Morse cycle completed: 1
✅ Morse cycle completed: 2
✅ Morse cycle completed: 3


In [7]:
MORSE_REV = {v: k for k, v in MORSE.items()}

def format_markers_with_letters(markers):
    """
    markers: list of (state, ms, kind) from logger.finalize()
    Returns a single string with newlines and -> 'LETTER' / -> 'Space'
    """
    parts = []
    current_code = ""  # collects '.' and '-' for the current letter

    def flush_letter(label_kind):
        nonlocal current_code
        if label_kind in ("letter_gap", "word_gap", "repeat_pause"):
            if current_code:
                letter = MORSE_REV.get(current_code, "?")
                parts.append(f" -> '{letter}'")
                current_code = ""
            elif label_kind in ("word_gap", "repeat_pause"):
                # no letter pending, but space/pause boundary
                parts.append(" -> 'Space'")

    for (state, ms, kind) in markers:
        # print the tuple
        parts.append(f"('{state}', {ms:.1f}, '{kind}')")

        # update current letter code
        if kind == "dot" and state == "on":
            current_code += "."
        elif kind == "dash" and state == "on":
            current_code += "-"

        # boundaries where we end a letter / word / pause => add arrow + newline
        if kind in ("letter_gap", "word_gap", "repeat_pause"):
            flush_letter(kind)
            parts.append("\n\n")  # blank line between groups
        else:
            parts.append(", ")

    # clean trailing separators
    out = "".join(parts).rstrip(", \n")
    return out

markers = logger.finalize()
print(format_markers_with_letters(markers))  

('on', 200.0, 'dot'), ('off', 200.0, 'intra'), ('on', 200.0, 'dot'), ('off', 200.0, 'intra'), ('on', 200.0, 'dot'), ('off', 600.0, 'letter_gap') -> 'S'

('on', 600.0, 'dash'), ('off', 200.0, 'intra'), ('on', 600.0, 'dash'), ('off', 200.0, 'intra'), ('on', 600.0, 'dash'), ('off', 600.0, 'letter_gap') -> 'O'

('on', 200.0, 'dot'), ('off', 200.0, 'intra'), ('on', 200.0, 'dot'), ('off', 200.0, 'intra'), ('on', 200.0, 'dot'), ('off', 1400.0, 'repeat_pause') -> 'S'

('on', 200.0, 'dot'), ('off', 200.0, 'intra'), ('on', 200.0, 'dot'), ('off', 200.0, 'intra'), ('on', 200.0, 'dot'), ('off', 600.0, 'letter_gap') -> 'S'

('on', 600.0, 'dash'), ('off', 200.0, 'intra'), ('on', 600.0, 'dash'), ('off', 200.0, 'intra'), ('on', 600.0, 'dash'), ('off', 600.0, 'letter_gap') -> 'O'

('on', 200.0, 'dot'), ('off', 200.0, 'intra'), ('on', 200.0, 'dot'), ('off', 200.0, 'intra'), ('on', 200.0, 'dot'), ('off', 1400.0, 'repeat_pause') -> 'S'

('on', 200.0, 'dot'), ('off', 200.0, 'intra'), ('on', 200.0, 'dot'), (