In [6]:
from LLMPromptGenerator import LLMPromptGenerator, DetailedInfo
from GenMusicFromPrompt import GenMusicFromPrompt

from audiocraft.models import MusicGen
from audiocraft.models import MultiBandDiffusion
from audiocraft.utils.notebook import display_audio

from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline


import math
import torchaudio
import torch
from audiocraft.utils.notebook import display_audio
from tqdm import tqdm
import json
import random
import os
from datasets import load_dataset
from audiocraft.data.audio_utils import convert_audio
import time

In [7]:
# from pydub import AudioSegment
# sound = AudioSegment.from_mp3("./conference_test/2023-10-5070-russell-m-nelson-32k-eng.mp3")
# sound.export("./conference_test/russell_m_nelson_10_23.wav", format="wav")

In [1]:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

GROUP_WORD_COUNT = 60 #120 
SONG_DUR_SECONDS = 30 #60 
PREV_SONG_DUR = 2 # 4

MAX_GROUP_CNT = 3 # limits the number of groups to be processing

NameError: name 'torch' is not defined

In [9]:
extractor = LLMPromptGenerator() # device=device

generator = GenMusicFromPrompt(device=device)


KeyboardInterrupt: 

In [None]:
def load_word_sections(file_path, desired_section_size):
    with open(file_path) as f: 
        book_text = f.read()

    words = book_text.split()

    # Calculate the total number of words
    total_words = len(words)

    # Determine the number of sections, aiming for equally sized sections
    # Calculate the optimal number of sections to avoid a significantly shorter final section
    optimal_num_sections = round(total_words / desired_section_size)

    # Calculate the new section size to more evenly distribute words across sections
    new_section_size = total_words // optimal_num_sections if total_words % optimal_num_sections == 0 else (total_words // optimal_num_sections) + 1

    # Adjust the last section to avoid being too short
    if total_words % new_section_size < new_section_size / 2:
        optimal_num_sections += 1

    word_sections = [' '.join(words[i:i+new_section_size]) for i in range(0, total_words, new_section_size)]
    return word_sections

In [None]:
class Music_Gen_Pipeline():
    
    def __init__(self,
                audio_pipe=None,
                # audio_model=None,
                # audio_processor=None,
                extractor=None,
                generator=None,
                # song_dur_seconds=SONG_DUR_SECONDS,
                # previous_song_duration=PREV_SONG_DUR,
                device="cpu",
                verbose=True,
                desired_section_size=GROUP_WORD_COUNT) -> None:
        # self.audio_model = audio_model
        # self.audio_processor = audio_processor
        self.audio_pipe = audio_pipe
        self.audio_device = 'cpu'
        # self.audio_device = 'cuda' if torch.cuda.is_available() else 'cpu'
        
        
        self.extractor = extractor
        self.generator = generator
        
        # self.previous_song_duration = previous_song_duration
        self.device = device
        self.verbose = verbose
        self.desired_section_size = desired_section_size
        
        self.original_audio = None

        if self.extractor is None:
            self.extractor = LLMPromptGenerator() #device=self.device)
            if self.verbose:
                print("Extractor not provided, using default")
        if self.generator is None:
            self.generator = GenMusicFromPrompt(device=self.device)
            if self.verbose:
                print("Generator not provided, using default")
    
    
    def load_txt(self, file_path):
        with open(file_path) as f: 
            book_text = f.read()
        return book_text
                
    def text_to_sections(self, text, desired_section_size=None, max_group_count=None):
        
        # If the input is a file path, load the text from the file
        if os.path.exists(text):
            text = self.load_txt(text)
        
        words = text.split()
        if desired_section_size is None:
            desired_section_size = self.desired_section_size

        # Calculate the total number of words
        total_words = len(words)

        # Determine the number of sections, aiming for equally sized sections
        # Calculate the optimal number of sections to avoid a significantly shorter final section
        optimal_num_sections = round(total_words / desired_section_size)

        # Calculate the new section size to more evenly distribute words across sections
        new_section_size = total_words // optimal_num_sections if total_words % optimal_num_sections == 0 else (total_words // optimal_num_sections) + 1

        # Adjust the last section to avoid being too short
        if total_words % new_section_size < new_section_size / 2:
            optimal_num_sections += 1

        word_sections = [' '.join(words[i:i+new_section_size]) for i in range(0, total_words, new_section_size)]
        
        if max_group_count is not None and len(word_sections) > max_group_count:
            word_sections = word_sections[:max_group_count]
        
        return word_sections
    
    def audio_to_sections(self, wav_audio, default_chunk_length_s=30, desired_lengths=20, max_length=30, last_chunk_buffer=0):
        def group_chunks(chunks, desired_lengths=20, max_length=30):
            def duration(chunk):
                return chunk['timestamp'][1] - chunk['timestamp'][0]
            new_chunks = []
            current_chunk = None
            for chunk in chunks:
                if current_chunk is not None:
                    new_duration = duration(current_chunk) + duration(chunk)
                    
                    if new_duration > max_length:
                        new_chunks.append(current_chunk)
                        current_chunk = None
                    else:
                        current_chunk['timestamp'][1] = chunk['timestamp'][1]
                        current_chunk['duration'] = new_duration
                        current_chunk['text'] += chunk['text']
                        if new_duration > desired_lengths and current_chunk['text'][-1] in ['.', '!', '?', '\n']:
                            new_chunks.append(current_chunk)
                            current_chunk = None
                            continue

                if current_chunk is None:
                    current_chunk = chunk
                    print(chunk)
                    current_chunk['timestamp'] = list(chunk['timestamp'])
                    current_chunk['duration'] = duration(current_chunk)
                    continue
            if current_chunk is not None:
                new_chunks.append(current_chunk)
            
            total_duration = sum([c['duration'] for c in new_chunks[:-1]])
            new_chunks[-1]['duration'] = ((new_chunks[-1]['timestamp'][1]) - total_duration) + last_chunk_buffer
            
            print('new chunks', new_chunks)
            return new_chunks
        
        
        if self.audio_pipe is None:
            self.audio_pipe = 'distil-whisper/distil-large-v2'
        if isinstance(self.audio_pipe, str):
            model_id = self.audio_pipe
            torch_dtype = torch.float16 if torch.cuda.is_available() and self.audio_device != "cpu" else torch.float32
            audio_model = AutoModelForSpeechSeq2Seq.from_pretrained(model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True)
            audio_model.to(self.audio_device)
            audio_processor = AutoProcessor.from_pretrained(model_id)
            audio_pipe =pipeline(
                "automatic-speech-recognition",
                model=audio_model,
                tokenizer=audio_processor.tokenizer,
                feature_extractor=audio_processor.feature_extractor,
                max_new_tokens=128,
                chunk_length_s=default_chunk_length_s,
                # chunk_length_s=15,
                batch_size=16,
                torch_dtype=torch_dtype,
                device=device,
            )
            self.audio_pipe = audio_pipe
        
        self.original_audio = wav_audio
        
        result = self.audio_pipe(wav_audio, return_timestamps=True)
        print(result)
        chunks = group_chunks(result['chunks'], desired_lengths=desired_lengths, max_length=max_length)
        return chunks, result['text']
        
    def sections_to_prompts(self, word_sections, flush_extractor=False, verbose=None):
        verbose = verbose if verbose is not None else self.verbose
        
        # Extract JSON Info
        extracted_json = extractor.extract_info(word_sections, flush=flush_extractor)
        # Generate Prompts
        prompts, info = extractor.prompts()
        
    
        if verbose:
            print(extractor.info)
            
            print('Prompts:')
            for p in prompts:
                print(p)
        return prompts, info
    
    def prompts_to_music(self, prompts, **kwargs):
        if isinstance(prompts, str):
            prompts = [DetailedInfo(None, None, prompts)]
        elif isinstance(prompts, dict):
            prompts = [DetailedInfo(prompts, None, prompts['text'])]
        elif isinstance(prompts, DetailedInfo):
            prompts = [prompts]
            
        if prompts is None or len(prompts) == 0:
            return
        
        if isinstance(prompts[0], dict):
            prompts = [DetailedInfo(p, None, p['text']) for p in prompts]
        elif isinstance(prompts[0], str):
            prompts = [DetailedInfo(None, None, p) for p in prompts]
            
        
        
        
        music = self.generator.generate_from_list(prompts, flush=True, **kwargs)
        save_file_loc = kwargs.get('save_file_loc', None)
        if save_file_loc is not None:
            if os.path.isdir(save_file_loc):
                base_name = os.path.basename(save_file_loc).split('.')[0] + '.wav'
                save_file_loc = os.path.join(save_file_loc, base_name)
            self.generator.save_audio(save_file_loc)
    
        
    # def generate_music(self, text, song_dur_seconds=SONG_DUR_SECONDS):
    def audio_to_music(self, wav_audio, song_dur_seconds=SONG_DUR_SECONDS, previous_song_duration=PREV_SONG_DUR, **kwargs):
        # Extract JSON Info
        print("Splitting Audio to chunks")
        chunks, text = self.audio_to_sections(wav_audio)
        print("Extracting Info from chunks")
        prompts, info = self.sections_to_prompts(chunks)
        
        save_info_loc = kwargs.get('save_info_loc', None)
        if save_info_loc is not None:
            # info_preped = [p.__dict__ for p in info]
            info_preped = [p.to_dict() for p in info]
            with open(save_info_loc, 'w') as f:
                json.dump(info_preped, f)
        
        print("Generating Music from prompts")
        self.prompts_to_music(info, **kwargs) # , song_dur_seconds=song_dur_seconds, previous_song_duration=previous_song_duration
        # self.prompts_to_music(prompts, song_dur_seconds=song_dur_seconds, previous_song_duration=previous_song_duration, **kwargs)
        return self.generator.song
        
        

In [None]:
# import wave
# import numpy as np



# file_path = "./conference_test/russell_m_nelson_10_23.wav"

# with wave.open(file_path, 'rb') as wav_file:
#     # Read the audio data
#     audio_data = wav_file.readframes(-1)
#     # Get the audio parameters
#     sample_width = wav_file.getsampwidth()
#     num_channels = wav_file.getnchannels()
#     sample_rate = wav_file.getframerate()
#     num_frames = wav_file.getnframes()

# # Convert audio data to numpy array
# # audio_data = np.frombuffer(audio_data, dtype=np.int16)
# audio_data = np.frombuffer(audio_data, dtype=np.float)


# # Print the audio parameters
# print("Sample Width:", sample_width)
# print("Number of Channels:", num_channels)
# print("Sample Rate:", sample_rate)
# print("Number of Frames:", num_frames)

from datasets import Dataset, Audio

audio_dataset = Dataset.from_dict({"audio": ["./conference_test/russell_m_nelson_10_23.wav"]}).cast_column("audio", Audio())



In [None]:
sample = audio_dataset[0]['audio']
sample.keys()

In [None]:
# def slice_audio(audio_array, sample_rate, duration):
#     """
#     Slices an audio array into snippets of a given maximum duration.
    
#     :param audio_array: NumPy array representing the audio signal.
#     :param sample_rate: Integer representing the number of samples per second in the audio.
#     :param duration: Float representing the maximum duration of each snippet in seconds.
#     :return: List of NumPy arrays, each representing a snippet of the audio.
#     """
#     # Calculate the number of samples per snippet
#     samples_per_snippet = int(sample_rate * duration)
    
#     # Calculate the total number of snippets
#     total_snippets = np.ceil(len(audio_array) / samples_per_snippet).astype(int)
    
#     # Slice the audio array into snippets
#     audio_snippets = [audio_array[i*samples_per_snippet : (i+1)*samples_per_snippet] for i in range(total_snippets)]
    
#     return audio_snippets

def slice_audio(audio_stuff, duration):
    """
    Slices an audio array into snippets of a given maximum duration.
    
    :param audio_array: NumPy array representing the audio signal.
    :param sample_rate: Integer representing the number of samples per second in the audio.
    :param duration: Float representing the maximum duration of each snippet in seconds.
    :return: List of NumPy arrays, each representing a snippet of the audio.
    """
    
    audio_array = audio_stuff['audio']
    
    
    
    # Calculate the number of samples per snippet
    samples_per_snippet = int(sample_rate * duration)
    
    # Calculate the total number of snippets
    total_snippets = np.ceil(len(audio_array) / samples_per_snippet).astype(int)
    
    # Slice the audio array into snippets
    audio_snippets = [audio_array[i*samples_per_snippet : (i+1)*samples_per_snippet] for i in range(total_snippets)]
    
    return audio_snippets

In [None]:

# dataset = load_dataset("distil-whisper/librispeech_long", "clean", split="validation")

# sample = dataset[0]["audio"]

# sample = {
#     "array": audio_data,
#     "sampling_rate": sample_rate
# }
sample = audio_dataset[0]['audio']

slices = slice_audio(sample['array'], sample['sampling_rate'], 240)






# pipe = Music_Gen_Pipeline(device=device, verbose=True)
pipe = Music_Gen_Pipeline(extractor=extractor, generator=generator, device=device, verbose=True)


start_time = time.time()
# display_audio(torch.from_numpy(sample['array']).unsqueeze(0), sample_rate=sample['sampling_rate'])
display_audio(torch.from_numpy(slices[0]).unsqueeze(0), sample_rate=sample['sampling_rate'])


song = pipe.audio_to_music(sample, save_file_loc='./conference_test/test.wav', save_info_loc='./conference_test/test_info.json')

end_time = time.time()

print(f"Time Elapsed: {end_time - start_time}")
display_audio(song, sample_rate=pipe.generator.sample_rate)




NameError: name 'audio_dataset' is not defined

In [None]:
def norm_sample_rate(audio_samples, sample_rates, target_rate=48000):
    if isinstance(audio_samples, torch.Tensor):
        audio_samples = [audio_samples]
    if isinstance(sample_rates, int):
        sample_rates = [sample_rates]
    ret_audio = []
    for audio, rate in zip(audio_samples, sample_rates):
        ret_audio.append(convert_audio(audio.unsqueeze(0), from_rate=rate, to_rate=target_rate, to_channels=1).squeeze())
    # return torch.stack(ret_audio)
    ret_audio = [a.numpy() for a in ret_audio]
    return ret_audio


# def load_audio(file_path):
#     # Load an audio file. Return the audio data and sample rate
#     audio, sr = librosa.load(file_path, sr=None)
#     return audio, sr

def adjust_volume(audio, percentage):
    # Adjust audio volume
    return audio * percentage

def combine_audio(speech_audio, background_audio, background_ratio=0.45):
    # Assuming the speech audio is the primary track and should not be shortened or lengthened
    # to match the background audio, we adjust the length of the background audio to match the speech audio
    min_len = min(len(speech_audio), len(background_audio))
    print(len(speech_audio), len(background_audio))
    combined_audio = speech_audio[:min_len] + adjust_volume(background_audio[:min_len], background_ratio)
    return combined_audio

# def adjust_volume(audio, percentage):
#     # This function now expects a PyTorch tensor as input
#     return audio * percentage

# # def combine_audio(speech_audio_dict, background_audio_tensor):
# def combine_audio(speech_audio, background_audio_tensor):

#     # Extract the actual audio data from the speech audio dictionary
#     # speech_audio = speech_audio_dict['audio']
    
#     # Ensure speech audio is a PyTorch tensor
#     if not isinstance(speech_audio, torch.Tensor):
#         # Assuming speech_audio is a numpy array or something similar;
#         # adjust this as necessary for your specific data format
#         speech_audio = torch.tensor(speech_audio, dtype=torch.float32)
    
#     # Ensure both tensors are on the same device
#     if background_audio_tensor.device != speech_audio.device:
#         background_audio_tensor = background_audio_tensor.to(speech_audio.device)
    
#     # Adjust lengths and combine
#     min_len = min(speech_audio.size(0), background_audio_tensor.size(0))
#     combined_audio = speech_audio[:min_len] + adjust_volume(background_audio_tensor[:min_len], 0.3)
    
#     return combined_audio

# sample = dataset[0]["audio"]

print(song.to("cpu"))

song_resampled = norm_sample_rate([song.to('cpu')], [pipe.generator.sample_rate], target_rate=sample['sampling_rate'])[0]



combined_audio = combine_audio(sample['array'], song_resampled)


display_audio(torch.from_numpy(combined_audio).unsqueeze(0), sample_rate=sample['sampling_rate'])


tensor([[[-0.0468, -0.0480, -0.0481,  ...,  0.1046,  0.1052,  0.1059]]])
999280 991680


In [None]:
from pydub import AudioSegment
import math

# Function to adjust volume based on a percentage
def adjust_volume(audio_track, percentage):
    # Convert percentage to dB
    change_in_dB = 20 * math.log10(percentage)
    return audio_track + change_in_dB


background_adjusted = adjust_volume(song, 0.3)






In [None]:
# book_text_loc = 'prompt_examples/fotr_gandalf_balrog.txt'
# book_text_loc = 'prompt_examples/fotr_gandalf_balrog.txt'


def full_pipeline(book_text_loc,
                  extractor=None,
                  generator=None,
                  group_word_count=GROUP_WORD_COUNT, 
                  max_group_count=MAX_GROUP_CNT,
                  song_dur_seconds=SONG_DUR_SECONDS, 
                  previous_song_duration=PREV_SONG_DUR,
                  device="cpu",
                  save_file_loc=None,
                  verbose=True,
                  flush_extractor=True):
    # Load in Text and Split into Sections
    word_sections = load_word_sections(book_text_loc, group_word_count)

    if max_group_count is not None and len(word_sections) > max_group_count:
        word_sections = word_sections[:max_group_count]

    if verbose:
        print('section count:', len(word_sections))

    if extractor is None:
        if verbose:
            print('Creating new extractor')
        extractor = LLMPromptGenerator()
    
    # Extract JSON Info
    extracted_json = extractor.extract_info(word_sections, flush=flush_extractor)
    # Generate Prompts
    prompts = extractor.prompts()
    
    print(extractor.info)
   
    if verbose:
        print('Prompts:')
        for p in prompts:
            print(p)
    
    if generator is None:
        if verbose:
            print('Creating new generator')
        generator = GenMusicFromPrompt(duration=song_dur_seconds, device=device, previous_song_duration=previous_song_duration)
    

    music = generator.generate_from_list(prompts, verbose=verbose, flush=True)
    if save_file_loc is not None:
        if os.path.isdir(save_file_loc):
            base_name = os.path.basename(book_text_loc).split('.')[0] + '.wav'
            save_file_loc = os.path.join(save_file_loc, base_name)
        generator.save_audio(save_file_loc)
    
    return generator, prompts #, extracted_json



In [None]:
# book_text_loc = 'prompt_examples/wok_war.txt'
book_text_loc = 'prompt_examples/fotr_gandalf_balrog.txt'

# book_text_loc = 'prompt_examples/cith_things.txt'
# book_text_loc = './prompt_examples/rots_obiewan_vs_anakin_pt1.txt'
# book_text_loc = './prompt_examples/raj_romeo_stalking_pt1.txt'


output_dir = './generated_examples_midterm/'
print('book_text_loc:', book_text_loc)
generator, prompts = full_pipeline(book_text_loc, extractor=extractor, save_file_loc=output_dir, device=device)

book_text_loc: prompt_examples/fotr_gandalf_balrog.txt
section count: 3




[MusicGenInfo(short_term=ShortTermAttributes(tone='mystical', intensity='soft', is_crescendo=False, volume='low'), long_term=LongTermAttributes(instrumentation='orchestral', short_background_ambient_setting='mystical forest', short_music_descriptors='ethereal', pitch='C', beat='slow', is_major_key=True)), MusicGenInfo(short_term=ShortTermAttributes(tone='mellow', intensity='soft and subtle', is_crescendo=False, volume='low'), long_term=LongTermAttributes(instrumentation='piano', short_background_ambient_setting='a soft, gentle melody', short_music_descriptors='a soothing, calming atmosphere', pitch='C', beat='slow and steady', is_major_key=True)), MusicGenInfo(short_term=ShortTermAttributes(tone='mysterious', intensity='soft', is_crescendo=False, volume='low'), long_term=LongTermAttributes(instrumentation='orchestral', short_background_ambient_setting='mysterious', short_music_descriptors='ethereal', pitch='middle', beat=' steady', is_major_key=False))]
Prompts:
mystical forest ambient

  0%|          | 0/3 [00:00<?, ?it/s]

  1503 /   1500

  return F.conv1d(input, weight, bias, self.stride,
 33%|███▎      | 1/3 [01:26<02:53, 86.79s/it]

  1003 /   1600

 67%|██████▋   | 2/3 [02:51<01:25, 85.27s/it]

  1000 /   1600

100%|██████████| 3/3 [04:15<00:00, 85.08s/it]

  1003 /   1600




In [None]:
for prompt in prompts:
    print(prompt)
    print('---')

outdoors setting, calm tone, low intensity, piano instrumentals, C pitch
---
A dark and stormy night setting, Serene tone, Soft intensity, Piano instrumentals, Middle C pitch
---
a clear summer evening setting, serene tone, soft intensity, piano instrumentals, C major pitch
---


## Left to work on

#### Before Midterm
* Metric for generated music - What is good?
* Look at generation times vs quality (Facebook model - Is live generation feasible?)

#### After Midterm
* Modify the LLM class to pass in a giant text file, batch, and generate json
* Pipeline the LLM and GenMusic class in a pipeline class
* Prompt engineering
* Fun: Karaoke interface 
