In [1]:
from pathlib import Path
from collections import Counter
from shutil import copy
from joblib import Parallel, delayed

In [2]:
from random import randint
import numpy as np
import matplotlib.pyplot as plt

In [3]:
from _galfitlib.functions.helper_functions import pj, exists, sp
from _galfitlib.utilities.music.galfit_with_music_update import galfitting, process_galfit_output
from audioprocessing.audio_processing_functions import load_audio, time_step_analysis
from audioprocessing.notes import note_ranges_for_search
from visualization.generate_mosaic import add_galaxy_to_mosaic

In [4]:
from IPython.display import Video

# Local Helper Functions
`binary_search_ranges` is a wrapper for binary search on a list of values in a list of ranges. This
is used to identify the primary notes in the Fourier analysis of the audio signal.

`sum_arrays_in_list` sums all numpy arrays in a list.

`plot_wrapper` is a wrapper for plotting the galaxies and outputting them to file.

`_parallel_plot_wrapper` parallelizes the plotting of the galaxies.

`_parallel_galfit_wrapper` parallelizes the GALFIT process.

In [5]:
# ======================== BEGIN HELPER FUNCTIONS ===============================================

def binary_search_ranges(ranges_for_search, values : list) -> list[tuple[str, float]]:
    """
    Wrapper for binary search on a list of values in a list of ranges
    """
    def binary_search(ranges : list, value : float, debug = False) -> tuple[str, float]:
        """
        Binary search for a value in a list of ranges
        Thanks to https://stackoverflow.com/a/65518827 for the code
        
        Parameters
        ----------
        ranges : list
            List of dictionaries containing the frequency ranges to search
        value : float
            Value to search for
        debug : bool
            Debug mode
            
        Returns
        -------
        str
            Note corresponding to the value
        float
            Difference between the value and the start of the frequency range
        """
    
        if len(ranges) == 1:
            if ranges[0]["start"] <= value <= ranges[0]["end"]:
                pass
            elif debug:
                print(f"Value ({value:.2f}) not found in *any* range.")
                print(f"Choosing note: {ranges[0]['value']}")
                
            # Normalize the difference wrt the total length of the range
            # Maximum value is 1 so if a value is at the beyond the range, 
            # in the case of gaps (per above) it will be set to 1
            range_len = ranges[0]["end"] - ranges[0]["start"]
            return (
                ranges[0]["value"], 
                min(
                        (value - ranges[0]["start"])/range_len, 1
                )
            )
    
        mid = len(ranges) // 2
        if ranges[mid]["end"] < value:
            return binary_search(ranges[mid:], value)
        elif ranges[mid]["start"] > value:
            return binary_search(ranges[:mid], value)
        else:
            # Normalize the difference wrt the total length of the range
            range_len = ranges[mid]["end"] - ranges[mid]["start"]
            return (
                ranges[mid]["value"], 
                (value - ranges[mid]["start"])/range_len
            )
        
    return [binary_search(ranges_for_search, value) for value in values]
    #yield binary_search(ranges_for_search, value)

In [6]:
def sum_arrays_in_list(input_list : list[np.ndarray]) -> np.ndarray:
    """
    Sum all arrays in a list
    
    Parameters
    ----------
    input_list : list
        List of arrays to sum
        
    Returns
    -------
    np.ndarray
        Sum of all arrays in the list
    """
    return np.sum(np.stack(input_list, axis = 0), axis = 0)

In [7]:
def plot_wrapper(
        all_images  : list[np.ndarray],
        image_count : int,
        fade_frames : int
):
    """
    Wrapper for plotting the galaxies and outputting them to file.
    
    Parameters
    ----------
    all_images : list
        List of images to plot
    image_count : int
        Current image count
    fade_frames : int
        Number of frames to fade in the galaxies
        
    Returns
    -------
    None
    """
    
    # Create a 0 image to be overplotted by the current image
    if image_count == 0:
        image_array_to_this_point = np.zeros(np.shape(all_images[0]))
        
    elif image_count == 1:
        image_array_to_this_point = all_images[0]
    
    # Create a sum of all images up to this point for the 'background'    
    else:
        image_array_to_this_point = sum_arrays_in_list(
                all_images[:image_count]
        )
    
    # Current image
    image_array = all_images[image_count]
    
    # Initialize an alpha image array in an attempt to fade
    # only the area closest to the brightest part of the galaxy
    # ... it's not perfect
    alpha_image_array = np.zeros(np.shape(image_array))    
    alpha_image_array[image_array > np.percentile(image_array, 99)] = 1
    
    # To fade in the galaxies
    fade_frames += 1 # +1 to exclude the last fully opaque frame
    for j in range(1, fade_frames):
        suffix_count = image_count * (fade_frames - 1) + j
        add_galaxy_to_mosaic(
                1, 
                [image_array_to_this_point],
                100000,
                save            = True, 
                filename        = pj(MOSAIC_DIR, f"mosaic_{suffix_count}.jpg"),
                save_alpha      = (j / fade_frames) * alpha_image_array,
                secondary_image = image_array
        )

In [8]:
def _parallel_plot_wrapper(
        all_images,
        fade_frames = 8
):
    """
    Parallelizes the plotting of the galaxies.
    
    All input parameters are used directly with the corresponding functions. 
    Please see the documentation therein.
    """
    
    _ = Parallel(n_jobs = -2, backend='multiprocessing')(
            delayed(plot_wrapper)(
                    all_images,
                    image_count,
                    fade_frames
            ) for image_count in range(len(all_images))
    )

In [9]:
def _parallel_galfit_wrapper(
        feedme_0,
        input_filename_0,
        output_filename_0,
        gname,
        x_pos,
        y_pos,
        model_offset,
        note_priority_map,
        notes_chunk,
        norm_volume,
        image_array,
        height,
        width,
        model_dim,
        seed_value = None
):
    """
    Parallelizes the GALFIT process.
    
    All input parameters are used directly with the corresponding functions. 
    Please see the documentation therein.
    """
    
    # Run GALFIT!
    # It's ugly but it works.
    image_array = Parallel(n_jobs = -2, backend='multiprocessing')(
            delayed(process_galfit_output)(
                    *galfitting(
                            feedme_0, 
                            input_filename_0, 
                            output_filename_0, 
                            gname,
                            height,
                            width,
                            model_offset,
                            note_priority_map,
                            notes_chunk, 
                            norm_volume, 
                            i,
                            seed_value = seed_value
                    )
            ) for i, (notes_chunk, norm_volume) in enumerate(zip(list_o_notes, norm_volumes))
    )
    
    return image_array

In [10]:
# ===================== END HELPER FUNCTIONS ===============================================

In [11]:
# These can be used with jupyter notebooks
#%matplotlib inline
#%load_ext autoreload
#%autoreload 2
#%matplotlib notebook

# Main Script
The following cell contains the hyperparameters for the script. 
The most important hyperparameter is the `music_filename` or music file, please select any audio 
that you like. The script will process that audio and generate a galaxy mosaic image and video
based on the Fourier analysis of the music. The video is ultimately for the formation of the 
galaxy mosaic set to the music.

In [12]:
# ======================== BEGIN HYPERPARAMETERS ===============================================
if __name__ == "__main__":
    plt.clf()

    # SELECT YOUR MUSIC
    music_filename = "brown_alma_mater.mp3"
    
    # Image size hyperparameters
    height = 1000
    width  = 1000
    # square
    model_dim    = 250
    model_offset = model_dim//2
    
    # Number of frames to fade in the galaxies
    fade_frames = 16

    # Wait half a second before starting the analysis
    delay = 0.5
    signal, sample_rate, time_array = load_audio(music_filename, delay = delay)

    # Process every 2 seconds of the song
    time_step   = 2
    t_step_size = time_step * sample_rate
    t_initial   = int(delay * sample_rate) + t_step_size
    
# ============================ END HYPERPARAMETERS =================================================

<Figure size 640x480 with 0 Axes>

# Set directories and initialize filenames

In [14]:
# ============================ BEGIN DIRECTORY THINGS ==========================================
if __name__ == "__main__":
    # Assume running from the galaxymusic top directory
    # if not, please copy the music-in directory to the directory where this script is located
    # This holds the template galaxy needed by GALFIT. Everything else will be done for you.
    cwd = Path.cwd()
    
    IN_DIR      = pj(cwd, "music-in")        
    TMP_DIR     = pj(cwd, "music-tmp")
    MOSAIC_DIR  = pj(TMP_DIR, "mosaics")
    GALFITS_DIR = pj(TMP_DIR, "galfits")
    OUT_DIR     = pj(cwd, "music-out")

    # Make a temporary directory for the mosaic images
    if not exists(MOSAIC_DIR):
        MOSAIC_DIR.mkdir()
        
    # Make a temporary directory for the GALFIT output
    if not exists(GALFITS_DIR):
        GALFITS_DIR.mkdir()
    
    gname = "template"
    
    feedme_0          = pj(GALFITS_DIR, f"{gname}.in")
    input_filename_0  = pj(IN_DIR, f"{gname}.fits")
    output_filename_0 = pj(GALFITS_DIR, f"{gname}.fits")
    
    if not exists(input_filename_0):
        raise FileNotFoundError(f"Template galaxy not found in {IN_DIR}")
    
# ============================== END DIRECTORY THINGS ==============================================

# Load the audio file and process it
We load the audio file and process it in chunks of `time_step` seconds. The Fourier analysis is
performed on each chunk and the primary notes are identified. The volume of each chunk is also
found and used in the generation of the galaxies in the mosaic. 

In [15]:
# ============================== BEGIN PROCESSING RECORDED AUDIO ===================================
if __name__ == "__main__":
    # Initializing some lists
    list_o_notes = []
    volumes      = []
    all_images   = []

    # Written as such to prepare for async processing with live audio.
    # Async is not needed for now.
    for t_step in range(t_initial, len(time_array), t_step_size):

        signal_cut = signal[t_step - t_step_size : t_step]

        # Select frequency peaks and get RMS volume
        selected_peaks, volume = time_step_analysis(signal_cut, sample_rate)
        if selected_peaks is None:
            continue

        # Search for notes by their frequency range using the selected frequency peaks
        peak_notes = binary_search_ranges(note_ranges_for_search, selected_peaks)

        # Store
        list_o_notes.append(peak_notes)
        volumes.append(volume)
    
# ============================== END PROCESSING RECORDED AUDIO =====================================

# Prepare for GALFIT
We prepare the data we have just processed from the music to be used in the GALFIT process.
This is mostly some re-organizing of data structures and the identification of the most common
notes found in the audio segment.

In [16]:
# ============================== BEGIN PREPARATION FOR GALFIT  =====================================
if __name__ == "__main__":    
    # Flatten the list of lists of notes for Counter
    flattened_list_o_notes = [note[0] for sublist in list_o_notes for note in sublist]
    
    # Counter does not sort by frequency, so we need to sort it
    sorted_counter = {
        k : v for k, v in sorted(
                Counter(flattened_list_o_notes).items(), key=lambda item: item[1], 
                reverse = True
        )
    }

    # Map notes to how number indicating often they show up in the Fourier analysis
    # aka their 'priority number'
    note_priority_map = {
        note : priority_num
        for priority_num, note in enumerate(
                sorted_counter.keys()
        )
    }
    
    # Normalize volumes
    volumes      = np.array(volumes)
    max_volume   = np.max(volumes)
    min_volume   = np.min(volumes)
    norm_volumes = (volumes - min_volume)/(1 + max_volume - min_volume)

    # Initialize model generation loop
    image_array      = np.zeros((height, width))
    full_image_array = np.zeros((height, width))
    
    # Set a seed value
    # If None, then seed uses the current time.
    initial_seed_value = None
    
# ============================== END PREPARATION FOR GALFIT  =======================================

# Create Models and Output FITS images
In the cell below, we create the models and output the FITS images for each galaxy. This is done
in parallel. The two functions used here are `galfitting` and `process_galfit_output`.
The `galfitting` function is used to generate the models and output the FITS images 
while the `process_galfit_output` function is used to process the output of GALFIT and extract the
raw data from the FITS images.

In [17]:
# ========================= CREATE MODELS AND OUTPUT FITS IMAGES ===================================
if __name__ == "__main__":
    # Parallelize     
    all_images = _parallel_galfit_wrapper(
            feedme_0,
            input_filename_0,
            output_filename_0,
            gname,
            randint(0, width - model_dim),
            randint(0, height - model_dim),
            model_offset,
            note_priority_map,
            list_o_notes,
            norm_volumes,
            image_array,
            height,
            width,
            model_dim,
            seed_value = initial_seed_value
    )

#  Create Galaxy Images and Mosaic
The galaxy images are created and placed on the mosaic. The galaxy images are faded in over
`fade_frames` number of frames. The final mosaics are saved as jpg images in the `MOSAIC_DIR` 
directory of `TMP_DIR`. A final version of the entire mosaic can be found in the `OUT_DIR`.

In [18]:
# ============================== CREATE GALAXY IMAGES AND MOSAIC ===================================
if __name__ == "__main__":
    # Parallelize output of plots
    _parallel_plot_wrapper(all_images, fade_frames)
    
    full_image_array = sum_arrays_in_list(all_images)
    
    almost_final_image_count = fade_frames*len(all_images)
    # One last plot
    add_galaxy_to_mosaic(
            1,
            [full_image_array],
            100000,
            save            = True, 
            filename        = pj(MOSAIC_DIR, f"mosaic_{almost_final_image_count}.jpg"),
            # save_alpha    = (j / fade_frames)*alpha_image_array,
            secondary_image = full_image_array
    )
    final_filename = pj(MOSAIC_DIR, f"mosaic_{almost_final_image_count}.jpg")
    # A final few frames are useful
    for i in range(1, fade_frames // 2 + 1):
        copy(final_filename, pj(MOSAIC_DIR, f"mosaic_{almost_final_image_count + i}.jpg"))

    copy(final_filename, pj(OUT_DIR, f"mosaic.jpg"))

<Figure size 640x480 with 0 Axes>

In [19]:
# #from visualization.generate_mosaic import generate_animation
# # Create the galaxy mosaic animation
# _ = generate_animation(
#         all_images,
#         time_step,
#         filename = pj(MOSAIC_DIR, f"mosaic.gif")
# )

# Generate the Mosaic Video
Finally, we generate the mosaic video. This is done using ffmpeg to combine the static images
together and to also add the music to the video. The video is ultimately saved in `OUT_DIR`.

In [20]:
# ============================== GENERATE MOSAIC VIDEO =============================================
if __name__ == "__main__":
    # Using imagemagick to generate a gif with all frames
    #process = sp(f"convert -delay 50 -loop 0 {string_o_imgs} {pj(MOSAIC_DIR, 'IM_mosaic.gif')}")
    
    # Using ffmpeg
    output_video = pj(OUT_DIR, "mosaic.mp4")
    
    if exists(output_video):
        output_video.unlink()
    
    mosaic_image_prefix = pj(MOSAIC_DIR, "mosaic")
    
    # ffmpeg works best with jpg
    process = sp(f"ffmpeg -framerate {fade_frames // time_step} -i {mosaic_image_prefix}_%d.jpg -i {music_filename} -vcodec libx264 {output_video}")

# Play the Video (if possible)
Finally, we play the video if the IDE supports it.

In [21]:
if __name__ == "__main__":
    # Play the video if compatible with the IDE
    Video(output_video)

# Clean Up
If desired, the entire `TMP_DIR` can be removed. This will remove all the temporary files
generated by GALFIT and the images used to create the video.

In [None]:
if __name__ == "__main__":
    # Clean up
    clean_up = False
    if clean_up:
        # Remove the temporary directory
        TMP_DIR.rmdir()