You might consider using a downloader to rip the first n seconds
yt-dlp -x --audio-format mp3 --postprocessor-args "-ss 0 -t 600" -o "custom_filename.%(ext)s" https://www.youtube.com/watch?v=RzcrHxWHH-0


In [None]:
%pip install -r requirements.txt

In [13]:
import cairo
import math
import numpy as np
import multiprocessing as mp
import imageio
import random
#import tqdm
# from tqdm.notebook import tqdm
from tqdm import tqdm
from pydub import AudioSegment
import subprocess
import json

import time

In [3]:
WIDTH, HEIGHT = 512, 512  # Reduced size for faster processing
SPIRAL_COLOR = (191, 0, 251)  # Green
BG_COLOR = (0, 0, 0)  # Black
SPIN_SPEED = 1.0
THROB_SPEED = 1.0
THROB_STRENGTH = 1.0
ZOOM = 1.0


In [5]:
def create_spiral_frame(time):
    surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, WIDTH, HEIGHT)
    ctx = cairo.Context(surface)

    # Fill background
    ctx.set_source_rgb(*BG_COLOR)
    ctx.paint()

    for x in range(WIDTH):
        for y in range(HEIGHT):
            truPos_x = (x / WIDTH - 0.5) * 2
            truPos_y = (y / HEIGHT - 0.5) * 2 * (HEIGHT / WIDTH)

            angle = math.atan2(truPos_y, truPos_x)
            dist = math.pow((truPos_x**2 + truPos_y**2)**0.5, 0.4 + math.sin((time + math.cos(time * 0.05) * 0.1) * THROB_SPEED) * 0.2)

            spi_factor = math.pow(math.sin(angle + dist * 40 * ZOOM - time * 5 * SPIN_SPEED) + 1.0, 50)
            spi_factor = max(0, min(spi_factor, 1))

            r = SPIRAL_COLOR[0] * (1 - spi_factor) + BG_COLOR[0] * spi_factor
            g = SPIRAL_COLOR[1] * (1 - spi_factor) + BG_COLOR[1] * spi_factor
            b = SPIRAL_COLOR[2] * (1 - spi_factor) + BG_COLOR[2] * spi_factor

            ctx.set_source_rgb(r, g, b)
            ctx.rectangle(x, y, 1, 1)
            ctx.fill()

    # Convert Cairo surface to numpy array
    buf = surface.get_data()
    data = np.ndarray(shape=(HEIGHT, WIDTH, 4), dtype=np.uint8, buffer=buf)
    return data[:, :, [2, 1, 0]]  # Convert BGRA to RGB


In [None]:

def create_animation(output_file, format='mp4'):
    frames = []
    total_frames = 60

    # Create a progress bar
    with tqdm(total=total_frames, desc="Creating frames", unit="frame") as pbar:
        for i in range(total_frames):
            time = i * 0.1
            frame = create_spiral_frame(time)
            frames.append(frame)
            pbar.update(1)  # Update progress bar

    print("Saving animation...")
    if format == 'mp4':
        with imageio.get_writer(output_file, fps=30) as writer:
            for frame in tqdm(frames, desc="Writing frames", unit="frame"):
                writer.append_data(frame)
    elif format == 'gif':
        imageio.mimsave(output_file, frames, fps=30, progress_bar=True)
    else:
        raise ValueError("Unsupported format. Choose 'mp4' or 'gif'.")

    print(f"Animation saved as {output_file}")

# Create MP4 video
create_animation('spiral_animation.mp4', format='mp4')

# Create GIF
# create_animation('spiral_animation.gif', format='gif')

In [9]:
def extract_random_segment(file_path, duration_seconds):
    # Load the audio file
    audio = AudioSegment.from_mp3(file_path)
    
    # Calculate the duration of the audio file in milliseconds
    audio_length_ms = len(audio)
    
    # Calculate the duration of the segment in milliseconds
    segment_length_ms = duration_seconds * 1000
    
    if segment_length_ms > audio_length_ms:
        raise ValueError("Segment length is greater than the audio length.")
    
    # Determine a random starting point for the segment
    start_ms = random.randint(0, audio_length_ms - segment_length_ms)
    
    # Extract the segment
    segment = audio[start_ms:start_ms + segment_length_ms]
    
    return segment

# Example usage
random_segment = extract_random_segment(r"audio/Radio Chatter & Spaceship Patrol Flight in Saturn Ring. Sci-Fi Ambience for Sleep, Study, Relaxing [RzcrHxWHH-0].mp3", 10)
random_segment.export(r"audio\random_segment.mp3", format="mp3")


In [2]:
file_path = r"audio/Radio Chatter & Spaceship Patrol Flight in Saturn Ring. Sci-Fi Ambience for Sleep, Study, Relaxing [RzcrHxWHH-0].mp3"
audio = AudioSegment.from_mp3(file_path)

In [1]:
import multiprocessing
import numpy as np
import imageio
from stub import process_chunk

# Constants
WIDTH, HEIGHT = 800, 600
BG_COLOR = (0.1, 0.1, 0.1)
SPIRAL_COLOR = (0.8, 0.8, 0.8)
THROB_SPEED = 2.0
ZOOM = 0.5
SPIN_SPEED = 0.5

def create_spiral_frame(time):
    num_processes = multiprocessing.cpu_count()
    chunk_size = HEIGHT // num_processes
    
    pool = multiprocessing.Pool(processes=num_processes)
    
    args = [(i * chunk_size, 
             min((i + 1) * chunk_size, HEIGHT), 
             WIDTH, HEIGHT, time, 
             BG_COLOR, SPIRAL_COLOR, 
             THROB_SPEED, ZOOM, SPIN_SPEED) 
            for i in range(num_processes)]
    
    results = pool.map(process_chunk, args)
    
    pool.close()
    pool.join()
    
    return np.vstack(results)

def create_animation(output_file, format='mp4'):
    frames = []
    for i in range(60):
        time = i * 0.1
        frame = create_spiral_frame(time)
        frames.append(frame)

    if format == 'mp4':
        with imageio.get_writer(output_file, fps=30

SyntaxError: incomplete input (2864971077.py, line 42)

In [4]:
def create_frame_wrapper(args):
    time, _ = args
    return create_spiral_frame(time)

def create_animation_parallel(output_file, format='mp4', num_processes=None):
    if num_processes is None:
        num_processes = mp.cpu_count()

    pool = mp.Pool(processes=num_processes)
    
    times = [(i * 0.1, i) for i in range(60)]
    frames = pool.map(create_frame_wrapper, times)

    pool.close()
    pool.join()

    if format == 'mp4':
        with imageio.get_writer(output_file, fps=30) as writer:
            for frame in frames:
                writer.append_data(frame)
    elif format == 'gif':
        imageio.mimsave(output_file, frames, fps=30)
    else:
        raise ValueError("Unsupported format. Choose 'mp4' or 'gif'.")

    print(f"Animation saved as {output_file}")

if __name__ == "__main__":
    # Create MP4 video
    create_animation_parallel('spiral_animation.mp4', format='mp4')

    # Create GIF
    # create_animation_parallel('spiral_animation.gif', format='gif')

In [8]:
def create_frame_wrapper(args):
    time, _ = args
    return create_spiral_frame(time)

def create_animation_parallel(output_file, format='mp4', num_processes=None):
    if num_processes is None:
        num_processes = mp.cpu_count()

    pool = mp.Pool(processes=num_processes)
    
    times = [(i * 0.1, i) for i in range(60)]
    
    # Use tqdm to create a progress bar
    with tqdm(total=len(times), desc="Generating frames") as pbar:
        frames = []
        for result in pool.imap(create_frame_wrapper, times):
            frames.append(result)
            pbar.update()

    pool.close()
    pool.join()

    print("Saving animation...")
    if format == 'mp4':
        with imageio.get_writer(output_file, fps=30) as writer:
            for frame in tqdm(frames, desc="Writing frames"):
                writer.append_data(frame)
    elif format == 'gif':
        imageio.mimsave(output_file, frames, fps=30)
    else:
        raise ValueError("Unsupported format. Choose 'mp4' or 'gif'.")

    print(f"Animation saved as {output_file}")

if __name__ == "__main__":
    # Create MP4 video
    create_animation_parallel('spiral_animation.mp4', format='mp4')

Generating frames:   0%|          | 0/60 [00:00<?, ?it/s]

In [15]:
def run_animation(output_file, format='mp4'):
    process = subprocess.Popen(['python', 'stub.py', output_file, format], 
                               stdout=subprocess.PIPE, 
                               stderr=subprocess.PIPE,
                               universal_newlines=True)
    
    pbar = tqdm(total=60, desc="Generating frames")
    
    while True:
        output = process.stdout.readline()
        if output == '' and process.poll() is not None:
            break
        if output:
            try:
                data = json.loads(output.strip())
                if data['status'] == 'complete':
                    pbar.update(60 - pbar.n)  # Ensure the bar completes
                    print(f"Animation saved as {data['output_file']}")
                    break
            except json.JSONDecodeError:
                pass
        pbar.update(1)
        time.sleep(0.1)  # Small delay to prevent excessive CPU usage
    
    pbar.close()
    rc = process.poll()
    return rc

In [16]:
# Cell 3: Run the animation
# Create MP4 video
run_animation('spiral_animation.mp4', format='mp4')

# Uncomment to create GIF instead
# run_animation('spiral_animation.gif', format='gif')

Generating frames: 100%|██████████| 60/60 [00:10<00:00,  5.77it/s]

Animation saved as spiral_animation.mp4



