In [None]:
# installing packages
#pip install faster_whisper diffusers transformers accelerate xformers scipy sounddevice pydub

In [None]:
#All requiered Imports
import sounddevice as sd
from scipy.io.wavfile import write
from pydub import AudioSegment
#from pydub.utils import which
import os

import torch

from tqdm import tqdm
from diffusers import DiffusionPipeline, DPMSolverMultistepScheduler
import matplotlib.pyplot as plt

import re


os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

In [None]:
# Record audio using microphone



# Set working directory
os.chdir(r"D:\soundtoimage")

def record_audio(duration, sample_rate=44100):
    print("Recording...")
    audio_data = sd.rec(int(duration * sample_rate), samplerate=sample_rate, channels=1, dtype='int16')
    sd.wait()  # Wait until recording is finished
    print("Recording complete. Saving as WAV...")
    write("temp.wav", sample_rate, audio_data)  # Save as WAV file

    # Verify if WAV file was created
    if os.path.exists("temp.wav"):
        print("temp.wav file found.")
        return
    
    #print(AudioSegment.converter)

# Record a 5-second audio and save as 'recorded.mp3'
record_audio(duration=20)


In [None]:
wav_file_path = "temp.wav" # Set the path for the wav file
print(os.path.exists(wav_file_path))  # Should print True if the file exists



#Convert the audio file to segments
def convert_audio(filename):
    # Convert WAV to MP3
    print("Converting to MP3...")
    try:
        audio = AudioSegment.from_wav("temp.wav")
        audio.export(filename, format="mp3")
        print(f"Saved as {filename}.")
    except FileNotFoundError as e:
        print("Error: File not found.")
        print(e)

mp3_file = convert_audio("recorded.mp3")

In [None]:
def safe_str(s):
	return s.replace('\'', '\\\'')

In [None]:
#Test Cuda availabillity
print("Torch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
print("CUDA version:", torch.version.cuda)

In [None]:
from faster_whisper import WhisperModel

model = WhisperModel('medium', compute_type='int8', device="cpu") # load Whisper model (text transcription)
segments, info = model.transcribe("temp.wav", word_timestamps=True) # get transcript

In [None]:

# segment phrases better - general logic is to further split phrases where there is punctuation or pauses in speaking
SPEAK_PAUSE = 1 # time in seconds for a word to be spoken before it is considered a pause
transcript = ''
segment_timestamps, word_timestamps = [], []
start = None
segment_str = ''

for s in segments:
    transcript += s.text
    for idx, word in enumerate(s.words):
        word_timestamps.append([word.start, word.end, word.word])
        segment_str = segment_str + word.word

        if start is None:
            start = word.start
        if (word.word[-1] in ',!.;?') or (word.end - word.start > SPEAK_PAUSE) or ((idx == len(s.words) - 1) and start is not None):
            if segment_str != '':
                segment_timestamps.append([start, word.end, segment_str.strip()])
            segment_str = ''
            start = None

print('Transcript')
print(transcript)

print('Automatically generated word timestamps')
for s, e, w in word_timestamps:
    print(s, e, w)
print()

print('Automatically generated segment timestamps')
for s, e, w in segment_timestamps:
    print(s, e, w)

In [None]:
prompts = [segment for _s, _e, segment in segment_timestamps]

print('Current Prompts')
print('[')
for p in prompts:
    print(f'\t\'{safe_str(p)}\',')
print(']')

In [None]:
# Test image generation for the prompts to experiment with prompts for Stable Diffusion

# Load the Stable Diffusion pipeline
pipe = DiffusionPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    torch_dtype=torch.float16,
    safety_checker=None  # Safety checker disabled; ensure ethical use
)
pipe = pipe.to("cuda")  # Move pipeline to GPU

# Define test prompts
test_prompts = [
    'Goat,',
    'animal,',
    'water,',
    'the goat man is',
    'jumping from the city skyline,',
    'Jesus Christ,',
    'keine',
    'Ahnung,',
    'ich frag mich ob es auch in beiden Sprachen geht,',
    'ich nehme einfach eine 20',
    'Sekunden Audio gerade auf mit irgendwelchen Worten,',
    'Obama,',
    'Bush,',
    'nein',
]

if not test_prompts:
    raise ValueError("No prompts provided for testing.")

print('Current Testing Prompts (test images will be generated):')
print('[')
for p in test_prompts:
    print(f'\t\'{p}\',')
print(']')

batch_size = 4

# Optimize pipeline
pipe.enable_attention_slicing()
pipe.unet.to(memory_format=torch.channels_last)

# Attempt to enable xformers, with fallback if unsupported
try:
    pipe.enable_xformers_memory_efficient_attention()
except NotImplementedError as e:
    print("xformers not supported, proceeding without it:", e)

# Divide prompts into batches
prompt_chunks = [
    test_prompts[batch_size * s_i : batch_size * (s_i + 1)]
    for s_i in range(len(test_prompts) // batch_size)
]
if len(test_prompts) % batch_size != 0:
    prompt_chunks.append(test_prompts[-(len(test_prompts) % batch_size):])

# Generate images for each batch of prompts
images = []
for ps in tqdm(prompt_chunks, desc="Generating Images"):
    images += pipe(ps, num_inference_steps=20, guidance_scale=8.5).images

# Display generated images
for prompt, image in zip(test_prompts, images):
    plt.imshow(image)
    plt.title(prompt)
    plt.axis("off")
    plt.show()


In [None]:
# get timestamps for stable diffusion prompts
prompt_transcript_pairs = [ # each element is the prompt string (or (prompt string, string within transcript that corresponds with your prompt))
    # PUT PAIRS HERE 
    # Use Mood for first parameter(mood,prompt)
    ('Goat','Goat'),
    ('animal','animal'),
    ('water','water'),
    ('goat man','the goat man is'),
    ('jumping from the city skyline','jumping from the city skyline'),
    ('Jesus Christ','Jesus Christ'),
    ('keine','keine'),
    ('beiden Sprachen','ich frag mich ob es auch in beiden Sprachen geht,'),
    ('nehme einfach eine 20','ich nehme einfach eine 20'),
    ('irgendwelchen Worten','Sekunden Audio gerade auf mit irgendwelchen Worten,'),
    ('Obama','Obama,'),
    ('Bush','Bush,'),
    ('nein','nein')
]


to_alnum = lambda s: re.sub(r'[^a-zA-Z0-9_\s]+', '', s)
transcript_words = to_alnum(transcript.lower()).split()
print('TEXT TRANSCRIPT')
start_word_idx = 0
for p_idx, p in enumerate(prompt_transcript_pairs):
    if isinstance(p, str):
        transcript_text = p
    else:
        p, transcript_text = p

    for search_word_idx, word in enumerate(to_alnum(transcript_text).split()):
        try:
            word_idx = transcript_words.index(word.lower(), start_word_idx)
        except IndexError:
            raise IndexError(f'{word} not found in transcript; prompt: {p}')
        if search_word_idx == 0:
            start_word_idx = word_idx

        if word_idx != start_word_idx + search_word_idx:
            break
    else:
        s_time = 0 if p_idx == 0 else word_timestamps[start_word_idx][0]
        print(f'({s_time}, \'{safe_str(p)})\',')

In [None]:
# get timestamps for stable diffusion prompts
prompt_transcript_pairs = [ # each element is the prompt string (or (prompt string, string within transcript that corresponds with your prompt))
    # PUT PAIRS HERE 
    # Use Mood for first parameter(mood,prompt)
    ('','Goat'),
    ('','animal'),
    ('','water'),
    ('','the goat man is')
    ('','jumping from the city skyline'),
    ('','Jesus Christ'),
    ('','keine'),
    ('','ich frag mich ob es auch in beiden Sprachen geht,'),
    ('','ich nehme einfach eine 20'),
    ('','Sekunden Audio gerade auf mit irgendwelchen Worten,'),
    ('','Obama,'),
    ('','Bush,'),
    ('','nein'),
]


to_alnum = lambda s: re.sub(r'[^a-zA-Z0-9_\s]+', '', s)
transcript_words = to_alnum(transcript.lower()).split()
print('TEXT TRANSCRIPT')
start_word_idx = 0
for p_idx, p in enumerate(prompt_transcript_pairs):
    if isinstance(p, str):
        transcript_text = p
    else:
        p, transcript_text = p

    for search_word_idx, word in enumerate(to_alnum(transcript_text).split()):
        try:
            word_idx = transcript_words.index(word.lower(), start_word_idx)
        except IndexError:
            raise IndexError(f'{word} not found in transcript; prompt: {p}')
        if search_word_idx == 0:
            start_word_idx = word_idx

        if word_idx != start_word_idx + search_word_idx:
            break
    else:
        s_time = 0 if p_idx == 0 else word_timestamps[start_word_idx][0]
        print(f'({s_time}, \'{safe_str(p)})\',')

In [None]:
from speechbrain.pretrained import EncoderClassifier

classifier = EncoderClassifier.from_hparams(
    source="speechbrain/spkrec-ecapa-voxceleb",
    savedir="tmpdir"
)
print("Model loaded successfully!")


In [None]:
from speechbrain.pretrained import SpeakerRecognition

# Function to clean text to alphanumeric

def to_alnum(s):
    return re.sub(r'[^a-zA-Z0-9_\s]+', '', s)

# Mood detection function using SpeechBrain
def detect_mood(audio_file):
    # Load pre-trained model
    emotion_model = SpeakerRecognition.from_hparams(source="speechbrain/emotion-recognition", savedir="tmp")

    # Load the audio file
    signal, sr = librosa.load(audio_file, sr=16000)  # Ensure 16kHz sampling rate
    duration = librosa.get_duration(signal, sr=sr)

    # Process the audio in chunks (e.g., 2-second chunks)
    chunk_duration = 2  # seconds
    mood_list = []
    for start in range(0, int(duration), chunk_duration):
        end = min(int(duration), start + chunk_duration)
        chunk = signal[int(start * sr):int(end * sr)]

        # Save the chunk temporarily for processing
        chunk_file = f"chunk_{start}_{end}.wav"
        librosa.output.write_wav(chunk_file, chunk, sr)

        # Run emotion detection
        prediction = emotion_model.classify_file(chunk_file)
        emotion = prediction['class']  # Extract predicted emotion

        # Map emotions to moods
        mood_mapping = {
            "happy": "energetic",
            "excited": "energetic",
            "sad": "melancholic",
            "angry": "intense",
            "neutral": "neutral",
            "calm": "calm"
        }
        mood = mood_mapping.get(emotion.lower(), "unknown")
        mood_list.append(mood)

    return mood_list

# Load Stable Diffusion pipeline
pipe = DiffusionPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    torch_dtype=torch.float16,
    safety_checker=None  # Safety checker disabled; ensure ethical use
)
pipe = pipe.to("cuda")

# Define prompts
prompts = [
    'Goat',
    'animal',
    'water',
    'the goat man is',
    'jumping from the city skyline',
    'Jesus Christ',
    'keine',
    'ich frag mich ob es auch in beiden Sprachen geht',
    'ich nehme einfach eine 20',
    'Sekunden Audio gerade auf mit irgendwelchen Worten',
    'Obama',
    'Bush',
    'nein',
]

# Define transcript and extract moods
transcript = "This is your audio transcript with sentences corresponding to prompts."  # Replace with actual transcript
word_timestamps = []  # Replace with word-level timestamps if available

audio_file = "your_audio_file.wav"  # Replace with the actual audio file path
moods = detect_mood(audio_file)

# Ensure moods match the number of prompts
if len(moods) < len(prompts):
    moods.extend(["neutral"] * (len(prompts) - len(moods)))  # Pad with neutral if necessary
elif len(moods) > len(prompts):
    moods = moods[:len(prompts)]  # Trim excess moods

# Create prompt-transcript pairs
prompt_transcript_pairs = [(mood, prompt) for mood, prompt in zip(moods, prompts)]

# Output prompt-transcript pairs
print("Prompt-Transcript Pairs:")
for pair in prompt_transcript_pairs:
    print(pair)

# Batch size and attention optimization
batch_size = 4
pipe.enable_attention_slicing()
pipe.unet.to(memory_format=torch.channels_last)

# Attempt to enable xformers memory-efficient attention
try:
    pipe.enable_xformers_memory_efficient_attention()
except NotImplementedError as e:
    print("xformers is not fully supported, proceeding without it:", e)

# Generate images in batches
prompt_chunks = [
    prompts[batch_size * s_i: batch_size * (s_i + 1)]
    for s_i in range(len(prompts) // batch_size)
]
if len(prompts) % batch_size != 0:
    prompt_chunks.append(prompts[-(len(prompts) % batch_size):])

images = []
for ps in tqdm(prompt_chunks, desc="Generating Images"):
    images += pipe(ps, num_inference_steps=20, guidance_scale=8.5).images

# Display generated images
for prompt, image in zip(prompts, images):
    plt.imshow(image)
    plt.title(prompt)
    plt.axis("off")
    plt.show()
