In [130]:
import taichi as ti
from ipycanvas import Canvas
from IPython.display import display
import ipywidgets as widgets
import time
import numpy as np
import signalflow as sf
from PIL import Image

# Initialize Taichi
ti.init(arch=ti.cpu)

################################################## Global Vars ##################################################
# 🌍


n = 500  # Image size (n x n)
img_path = "/Users/balintl/Documents/GitHub/sonification/experiments/lsm_paper/figures/cellular_seq/Timepoint_001_220518-ST_C03_s1.jpg"
padding = 40 # Padding around the image
refresh_interval = 1 / 60  # Minimum time between redraws (in seconds)


################################################## Data ##################################################
# 📊


image = ti.Vector.field(3, dtype=float, shape=(n, n))  # RGB field for colors
bg = ti.Vector.field(3, dtype=float, shape=(n, n))  # Background image
probe = ti.Vector.field(3, dtype=float, shape=(n, n))  # probe

# Cursor position and mouse state
cursor_pos = ti.Vector.field(2, dtype=int, shape=())
mouse_pressed = ti.field(dtype=int, shape=())  # 1 if mouse button is pressed, 0 otherwise
probe_size = ti.field(dtype=int, shape=(2))  # Size of the probe (normalized)
probe_stroke = 1  # Stroke width for the probe

# read a background image into a numpy array
img = Image.open(img_path)
img = img.resize((n, n))
img_data = np.array(img)
img_data_norm = img_data / 255.0  # Normalize to [0, 1]
bg.from_numpy(img_data_norm)
bg_np = img_data


################################################## Kernels ##################################################
# 💪


@ti.kernel
def draw_image():
    """Render the image and the probe over it."""
    center_x, center_y = cursor_pos[None]
    probe_w = probe_size[0]
    probe_h = probe_size[1]
    color = ti.Vector([1.0, 0.0, 0.0]) if mouse_pressed[None] else ti.Vector([1.0, 1.0, 1.0])  # Red or white

    for i, j in image:
        dist_x = abs(i - center_x)
        dist_y = abs(j - center_y)
        if dist_x <= probe_w//2 and dist_y <= probe_h//2 and (dist_x >= probe_w//2 - probe_stroke or dist_y >= probe_h//2 - probe_stroke):
                image[i, j] = color  # Probe color
        else:
            image[i, j] = bg[i, j]  # Background image


@ti.kernel
def draw_probe():
    """Render the probe contents."""
    center_x, center_y = cursor_pos[None]
    probe_w = probe_size[0]
    probe_h = probe_size[1]

    for i, j in probe:
        if i < probe_w and j < probe_h:
            # get bg under square
            bg_x = i + center_x - (probe_w // 2)
            bg_y = j + center_y - (probe_h // 2)
            probe[i, j] = bg[bg_x, bg_y]
        else:
            probe[i, j] = ti.Vector([1.0, 1.0, 1.0]) # White


################################################## GUI ##################################################
# 🖥️


# Create a canvas
canvas = Canvas(width=n*2 + padding*2, height=n + padding*2)
display(canvas)

# Create two sliders for controlling the Probe size
probe_w_slider = widgets.IntSlider(
    value=50,  # Initial value
    min=1,  # Minimum size
    max=n,   # Maximum size
    step=1, # Step size
    description='Probe Width',
)
display(probe_w_slider)

probe_h_slider = widgets.IntSlider(
    value=50,  # Initial value
    min=1,  # Minimum size
    max=n,   # Maximum size
    step=1, # Step size
    description='Probe Height',
)
display(probe_h_slider)

# Create a toggle button for audio
audio_graph_toggle = widgets.ToggleButton(
    value=False,
    description='Audio',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Toggle audio',
    icon="speaker"
)
display(audio_graph_toggle)

# Create a master volume slider
master_volume_slider = widgets.FloatSlider(
    value=0,  # Initial value
    min=-36,  # Minimum value
    max=0,   # Maximum value
    step=0.01, # Step size
    description='Master Volume (dB)',
)
display(master_volume_slider)

# Create two text widgets for displaying the mouse position in px
mouse_x_text = widgets.Text(
    value='',
    description='MouseX:',
    disabled=True
)
mouse_y_text = widgets.Text(
    value='',
    description='MouseY:',
    disabled=True
)
display(mouse_x_text)
display(mouse_y_text)


################################################## DSP ##################################################
# 🔈


graph = None
try:
    graph.destroy()
    graph = sf.AudioGraph(start=False)
except:
    graph = sf.AudioGraph(start=False)
    graph.clear()

def mix2samps(mixval, eps=1e-6):
    "Convert a mix value (used in sf.Smooth) to samples"
    return np.ceil(np.log(eps) / np.log(mixval))

def samps2mix(samps, eps=1e-6):
    "Convert samples to a mix value (used in sf.Smooth)"
    return eps ** (1 / samps)

# Synth
class Theremin(sf.Patch):
    def __init__(self, frequency=440, amplitude=0.5, smooth_n_samps=24000):
        super().__init__()
        self.input_buffers = {}
        self.params = {
            "frequency": {
                "min": 20,
                "max": 20000,
                "default": 440,
                "unit": "Hz",
                "buffer": None
            },
            "amplitude": {
                "min": 0,
                "max": 1,
                "default": 0.5,
                "unit": "",
                "buffer": None
            }
        }

        buf_frequency = sf.Buffer(1, 1)
        buf_amplitude = sf.Buffer(1, 1)
        
        buf_frequency.data[0][0] = frequency
        self.params["frequency"]["buffer"] = buf_frequency
        
        buf_amplitude.data[0][0] = amplitude
        self.params["amplitude"]["buffer"] = buf_amplitude
        
        frequency_value = sf.BufferPlayer(buf_frequency, loop=True)
        amplitude_value = sf.BufferPlayer(buf_amplitude, loop=True)
        
        freq_smooth = sf.Smooth(frequency_value, samps2mix(smooth_n_samps))
        amplitude_smooth = sf.Smooth(amplitude_value, samps2mix(smooth_n_samps))
        
        sine = sf.SineOscillator(freq_smooth)
        output = sf.StereoPanner(sine * amplitude_smooth, pan=0)
        
        self.set_output(output)

    def set_input_buf(self, name, value):
        self.params[name]["buffer"].data[0][0] = value

    def __getitem__(self, key):
        return self.params[key]


class Feature():
    """Feature base class"""
    def __init__(self, num_features=1, name="Feature"):
        self.num_features = num_features
        self.name = name
        self.features = sf.Buffer(1, num_features)
        self.min = np.zeros_like(self.features.data) * 1e6
        self.max = np.zeros_like(self.features.data) * -1e6
        self.create_widget()

    def __call__(self, mat):
        self.features.data[:, :] = self.compute(mat)
        self.update()
    
    def compute(self, mat):
        return NotImplemented

    def create_widget(self):
        # create a widget with two text boxes for min and max
        # create a box where all the text boxes go
        self.widgets = []
        for i in range(self.num_features):
            min_text = widgets.Text(
                value='',
                description=f'{self.name}_{i}_Min:',
                disabled=True
            )
            max_text = widgets.Text(
                value='',
                description=f'{self.name}_{i}_Min:',
                disabled=True
            )
            last_text = widgets.Text(
                value='',
                description=f'{self.name}_{i}_Last:',
                disabled=True
            )
            self.widgets.append((min_text, max_text, last_text))
        # attach them to the display
        for min_text, max_text, last_text in self.widgets:
            display(min_text)
            display(max_text)
            display(last_text)
        
        # create a reset button
        self.reset_btn = widgets.Button(description=f'Reset {self.name} MinMax')
        display(self.reset_btn)
        self.reset_btn.on_click(self.reset_minmax)

    def update_minmax(self):
        self.min = np.minimum(self.min, self.features.data)
        self.max = np.maximum(self.max, self.features.data)

    def update_widget(self):
        for i, (min_text, max_text, last_text) in enumerate(self.widgets):
            min_text.value = str(self.min[0][i])
            max_text.value = str(self.max[0][i])
            last_text.value = str(self.features.data[0][i])

    def update(self):
        self.update_minmax()
        self.update_widget()

    def reset_minmax(self, _ = None):
        self.min = np.zeros_like(self.features.data) * 1e6
        self.max = np.zeros_like(self.features.data) * -1e6
        self.update_widget()


class MeanPixelValue(Feature):
    """Compute the mean pixel value within a probe."""
    def __init__(self):
        super().__init__(num_features=1, name="MeanPixelValue")

    def compute(self, mat):
        return np.mean(mat)
    
# @jit(nopython=True)
# TODO: add numba support
def scale_array_exp(
    x: np.ndarray,
    in_low: np.ndarray,
    in_high: np.ndarray,
    out_low: np.ndarray,
    out_high: np.ndarray,
    exp: float = 1.0,
) -> np.ndarray:
    """
    Scales an array of values from one range to another. Based on the Max/MSP scale~ object.

    Args:
        x (np.ndarray): The array of values to scale.
        in_low (np.ndarray): The lower bound of the input range.
        in_high (np.ndarray): The upper bound of the input range.
        out_low (np.ndarray): The lower bound of the output range.
        out_high (np.ndarray): The upper bound of the output range.
        exp (float, optional): The exponent to use for the scaling. Defaults to 1.0.

    Returns:
        np.ndarray: The scaled array.
    """
    if in_high == in_low:
        return np.ones_like(x, dtype=np.float64) * out_high
    else:
        return np.where(
            (x-in_low)/(in_high-in_low) == 0,
            out_low,
            np.where(
                (x-in_low)/(in_high-in_low) > 0,
                out_low + (out_high-out_low) *
                ((x-in_low)/(in_high-in_low))**exp,
                out_low + (out_high-out_low) * -
                ((((-x+in_low)/(in_high-in_low)))**(exp))
            )
        )


class Mapper():
    """Map between two buffers. Typically from a feature buffer to a parameter buffer."""
    def __init__(
            self, 
            obj_in, 
            obj_out,
            in_low = None,
            in_high = None,
            out_low = None,
            out_high = None,
            exponent = 1,
            clamp: bool = True,

    ):
        self.obj_in = obj_in
        self.obj_out = obj_out

        # if the input object is an instance of a feature, then we want to map the output of the feature
        # to the input of the object
        if isinstance(self.obj_in, Feature):
            self.buf_in = self.obj_in.features
        elif isinstance(self.obj_in, dict):
            self.buf_in = self.obj_in["buffer"]
        else:
            raise ValueError("Input object must be a Feature or a dict")

        # expecting a synth's param dict here
        self.buf_out = self.obj_out["buffer"]

        # save scaling parameters
        self._in_low = in_low
        self._in_high = in_high
        self._out_low = out_low
        self._out_high = out_high
        self.exponent = exponent
        self.clamp = clamp

    @property
    def in_low(self):
        if self._in_low is None:
            if isinstance(self.obj_in, Feature):
                return self.obj_in.min
            elif isinstance(self.obj_in, dict):
                return self.obj_in["min"]
        else:
            return self._in_low
    
    @property
    def in_high(self):
        if self._in_high is None:
            if isinstance(self.obj_in, Feature):
                return self.obj_in.max
            elif isinstance(self.obj_in, dict):
                return self.obj_in["max"]
        else:
            return self._in_high

    @property
    def out_low(self):
        if self._out_low is None:
            return self.obj_out["min"]
        else:
            return self._out_low

    @property
    def out_high(self):
        if self._out_high is None:
            return self.obj_out["max"]
        else:
            return self._out_high

    def map(self):
        # scale the input buffer to the output buffer
        self.buf_out.data[:,:] = scale_array_exp(
            self.buf_in.data,
            self.in_low,
            self.in_high,
            self.out_low,
            self.out_high,
            self.exponent
        )
        if self.clamp:
            self.buf_out.data[:, :] = np.clip(self.buf_out.data[:, :], self.out_low, self.out_high)

    def __call__(self):
        self.map()


# Create objects
mean_pix = MeanPixelValue()
theremin = Theremin()
pix2freq = Mapper(mean_pix, theremin["frequency"], exponent=2, out_high=1000)

# DSP switch
dsp_switch_buf = sf.Buffer(1, 1)
dsp_switch_buf.data[0][0] = 0
dsp_switch = sf.BufferPlayer(dsp_switch_buf, loop=True)

# Master volume
master_slider_db = sf.Constant(master_volume_slider.value)
master_slider_a = sf.DecibelsToAmplitude(master_slider_db)
master_volume = sf.Smooth(master_slider_a * dsp_switch, samps2mix(24000))

audio_out = theremin * master_volume

audio_out.play()


################################################## Interaction ##################################################
# 🖱 ️

def redraw():
    """Render new frames for all kernels, then update the HTML canvas with the results."""
    global is_drawing

    if is_drawing:
        return  # Skip if we're already drawing

    is_drawing = True
    try:
        # Render the image with the probe
        draw_image()
        img_data = (image.to_numpy() * 255).astype(np.uint8)  # Scale to [0, 255] and convert to uint8
        img_data = np.transpose(img_data, (1, 0, 2))  # Transpose to match the canvas shape
        canvas.put_image_data(img_data, padding, padding)

        # Render the probe contents
        draw_probe()
        probe_data = (probe.to_numpy() * 255).astype(np.uint8)  # Scale to [0, 255] and convert to uint8
        probe_data = np.transpose(probe_data, (1, 0, 2))  # Transpose to match the canvas shape
        canvas.put_image_data(probe_data, n+20+padding, padding)
    finally:
        is_drawing = False  # Reset drawing state


def get_probe_matrix(mat):
    """Get the probe matrix from the background image."""
    x, y = cursor_pos[None]
    probe_w = probe_size[0]
    probe_h = probe_size[1]
    x_from = max(x - probe_w//2, 0)
    y_from = max(y - probe_h//2, 0)
    probe = mat[x_from : x_from + probe_w, y_from : y_from + probe_h]
    return probe


def handle_mouse_event(x, y, pressed: int = 0):
    """Handle mouse, compute probe features, update synth(s), and render kernels."""
    global latest_event_time

    probe_w = probe_size[0]
    probe_h = probe_size[1]
    # clamp x and y to the image size (undo padding) and also no less than half of the probe sides, so that the mouse is always in the middle of the probe
    x_clamped = np.clip(x-padding, probe_w//2, n-1-probe_w//2)
    y_clamped = np.clip(y-padding, probe_h//2, n-1-probe_h//2)
    cursor_pos[None] = [x_clamped, y_clamped]
    mouse_x_text.value = str(x_clamped)
    mouse_y_text.value = str(y_clamped)

    # Update mouse state
    if pressed == 2:
        mouse_pressed[None] = 1
        dsp_switch_buf.data[0][0] = 1
    elif pressed == 3:
        mouse_pressed[None] = 0
        dsp_switch_buf.data[0][0] = 0
    
    # Drop excess events over the refresh interval
    current_time = time.time()
    if current_time - latest_event_time < refresh_interval and pressed < 2: # only skip if mouse is up
        return  # Skip if we are processing too quickly
    latest_event_time = current_time  # Update the last event time

    # Get probe matrix
    probe_mat = get_probe_matrix(bg_np)

    # Compute probe features
    mean_pix(probe_mat)

    # Update mappings
    pix2freq()
    
    # Compute all kernels
    redraw()

# GUI callbacks

# Update probe size from sliders
def update_probe_width(change):
    probe_size[0] = change['new']
    redraw()

def update_probe_height(change):
    probe_size[1] = change['new']
    redraw()

# Toggle DSP
def toggle_audio(change):
    if change['new']:
        graph.start()
    else:
        graph.stop()

def update_master_volume(change):
    master_slider_db.set_value(change['new'])

# Mousing event listeners
canvas.on_mouse_move(lambda x, y: handle_mouse_event(x, y, 1 if mouse_pressed[None] == 1 else 0))  # Triggered during mouse movement (keeps track of mouse button state)
canvas.on_mouse_down(lambda x, y: handle_mouse_event(x, y, pressed=2))  # When mouse button pressed
canvas.on_mouse_up(lambda x, y: handle_mouse_event(x, y, pressed=3))  # When mouse button released

# GUI event listeners
probe_w_slider.observe(update_probe_width, names='value')
probe_h_slider.observe(update_probe_height, names='value')
audio_graph_toggle.observe(toggle_audio, names='value')
master_volume_slider.observe(update_master_volume, names='value')


################################################## INIT ##################################################
# 🚀


# Global state variables
is_drawing = False
latest_event_time = time.time()
# Initial draw
image.fill(0)  # Clear the field
cursor_pos[None] = [n//2, n//2]  # Start with the cursor in the center
mouse_pressed[None] = 0  # Start with mouse unpressed
probe_size[0], probe_size[1] = [probe_w_slider.value, probe_h_slider.value]
redraw()


[Taichi] Starting on arch=arm64


Canvas(height=580, width=1080)

IntSlider(value=50, description='Probe Width', max=500, min=1)

IntSlider(value=50, description='Probe Height', max=500, min=1)

ToggleButton(value=False, description='Audio', icon='speaker', tooltip='Toggle audio')

FloatSlider(value=0.0, description='Master Volume (dB)', max=0.0, min=-36.0, step=0.01)

Text(value='', description='MouseX:', disabled=True)

Text(value='', description='MouseY:', disabled=True)

AudioGraph: The global audio graph has already been created. To create a new graph, call .destroy() first.


Text(value='', description='MeanPixelValue_0_Min:', disabled=True)

Text(value='', description='MeanPixelValue_0_Min:', disabled=True)

Text(value='', description='MeanPixelValue_0_Last:', disabled=True)

Button(description='Reset MeanPixelValue MinMax', style=ButtonStyle())

In [67]:
graph.status

'AudioGraph: 10 active nodes, 0 patches, 1.49% CPU usage, 0.3MB memory usage, output = -9.0dB'

In [85]:
pix2freq.buf_in.data

array([[85.653336]], dtype=float32)

In [86]:
pix2freq.buf_out.data

array([[11931.974]], dtype=float32)

In [87]:
pix2freq.in_low, pix2freq.in_high, pix2freq.out_low, pix2freq.out_high, pix2freq.exponent, pix2freq.clamp

(array([[0.]], dtype=float32),
 array([[143.66667]], dtype=float32),
 20,
 20000,
 1,
 True)

In [88]:
mean_pix.min, mean_pix.max

(array([[0.]], dtype=float32), array([[143.66667]], dtype=float32))

In [95]:
probe_mat = get_probe_matrix(bg_np, 250, 250, probe_size, n)
probe_mat

array([], shape=(0, 250, 3), dtype=uint8)

In [124]:
probe_size[0], probe_size[1]

(500, 500)

In [123]:
1//2, 500//2

(0, 250)

In [122]:
np.clip(499, 0, 499)

np.int64(499)