# Qwen2.5-Omni

## Packages and dependencies

In [12]:
!pip uninstall transformers -y
!pip install git+https://github.com/huggingface/transformers@v4.51.3-Qwen2.5-Omni-preview
!pip install accelerate
# It's highly recommended to use `[decord]` feature for faster video loading.
!pip install qwen-omni-utils[decord] -U
!pip install accelerate
!pip install ipywidgets

Found existing installation: transformers 4.52.0.dev0
Uninstalling transformers-4.52.0.dev0:
  Successfully uninstalled transformers-4.52.0.dev0
Collecting git+https://github.com/huggingface/transformers@v4.51.3-Qwen2.5-Omni-preview
  Cloning https://github.com/huggingface/transformers (to revision v4.51.3-Qwen2.5-Omni-preview) to /tmp/pip-req-build-4kjjdthl
  Running command git clone --filter=blob:none --quiet https://github.com/huggingface/transformers /tmp/pip-req-build-4kjjdthl
  Running command git checkout -q f551466201fb4089d6df9dc55d80f0edbd149d85
  Resolved https://github.com/huggingface/transformers to commit f551466201fb4089d6df9dc55d80f0edbd149d85
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: transformers
  Building wheel for transformers (pyproject.toml) ... [?25l[?25hdone
  Created wheel for transformers: f

In [None]:
!git clone https://github.com/DrCet/multimodal-model-inference-and-finetuning

In [None]:
import torch
from transformers import (
    HfArgumentParser,
    Qwen2_5OmniForConditionalGeneration,
    Qwen2_5OmniProcessor
)
from qwen_omni_utils import process_mm_info
import sys 
import os
import re
import soundfile as sf 
from IPython.display import Audio, display, clear_output
import time
import ipywidgets as widgets

## is_flash_attention_2_supported

In [None]:
def is_flash_attention_2_supported():
    try:
        # Check CUDA availability and compute capability
        if not torch.cuda.is_available():
            print("CUDA not available.")
            return False
        compute_capability = torch.cuda.get_device_properties(0).major * 10 + torch.cuda.get_device_properties(0).minor
        if compute_capability < 80:  # Need compute capability >= 8.0
            print(f"GPU compute capability {compute_capability/10} is not supported (requires >= 8.0).")
            return False

        # Check PyTorch and CUDA versions
        torch_version = torch.__version__.split("+")[0]
        torch_major, torch_minor = map(int, torch_version.split(".")[:2])
        cuda_version = torch.version.cuda
        cuda_major, cuda_minor = map(int, cuda_version.split(".")[:2]) if cuda_version else (0, 0)
        if torch_major < 2 or (torch_major == 2 and torch_minor < 2):
            print(f"PyTorch {torch_version} is not supported (requires >= 2.2).")
            return False
        if cuda_major < 11 or (cuda_major == 11 and cuda_minor < 7):
            print(f"CUDA {cuda_version} is not supported (requires >= 11.7).")
            return False

        # Check Flash Attention availability
        if not torch.backends.cuda.flash_sdp_enabled():
            print("FlashAttention-2 is not available. Ensure flash-attn >= 2.1.0 is installed.")
            return False

        print("FlashAttention-2 is supported.")
        return True
    except Exception as e:
        print(f"Error checking FlashAttention-2 compatibility: {e}")
        return False

## args

In [None]:
model_name_or_path = "Qwen/Qwen2.5-Omni-3B"
device_map = 'auto'
torch_dtype = 'auto'
weights_only = False
attn_implementation = "flash_attention_2" if is_flash_attention_2_supported() else "sdpa"
speaker = 'Ethan' #Chelsie

## Load model

In [16]:
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
for i in range(torch.cuda.device_count()):
    with torch.cuda.device(i):
        torch.cuda.empty_cache()

model = Qwen2_5OmniForConditionalGeneration.from_pretrained(
    model_name_or_path,
    device_map=device_map,
    torch_dtype=torch_dtype,
    attn_implementation=attn_implementation,
    weights_only=weights_only
)

processor = Qwen2_5OmniProcessor.from_pretrained(model_name_or_path)

Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

  for key, value in torch.load(path).items():


## Preprocess

In [17]:
add_generation_prompt = True 
tokenize = False
use_audio_in_video = False
os.makedirs('image-video', exist_ok=True)

In [24]:
def extract_prompt_elements(prompt, uploaded_files=None, verbose=False):
    """Extract text, image, audio, and video from prompt."""
    # Regex patterns
    image_pattern = re.compile(r'((https?://[^\s<>"]+|www\.[^\s<>"]+|[^\s<>"]+\.(jpg|jpeg|png|gif|bmp))($|[^\w]))')
    audio_pattern = re.compile(r'((https?://[^\s<>"]+|www\.[^\s<>"]+|[^\s<>"]+\.(wav|mp3|ogg|aac|flac))($|[^\w]))')
    video_pattern = re.compile(r'((https?://[^\s<>"]+|www\.[^\s<>"]+|[^\s<>"]+\.(mp4|avi|mov|wmv|mkv))($|[^\w]))')
    
    elements = {
        "images": [],
        "audio": [],
        "video": [],
        "text": prompt
    }
    if uploaded_files:
        for file_type, file_info in uploaded_files.items():
            if len(file_info) !=0:
                filename = file_info[0]['name']
                content = file_info[0]['content']
                if file_type == 'image' and is_image_file(filename):
                    img_path = f'image-video/{filename}'
                    with open(img_path, 'wb') as f:
                        f.write(content)
                    elements['images'].append(img_path)
                elif file_type == 'video' and is_video_file(filename):
                    video_path = f"image-video/{filename}"
                    with open(video_path, 'wb') as f:
                        f.write(content)
                    elements['video'].append(video_path)
    # Extract URLs and files
    for pattern, key in [
        (video_pattern, "video"),
        (audio_pattern, "audio"),
        (image_pattern, "images"),
    ]:
        matches = pattern.findall(prompt)
        for match in matches:
            url_or_path = match[0]  # Full match (without boundary)
            elements[key].append(url_or_path)
            # Remove from prompt to isolate text
            elements["text"] = re.sub(pattern, ' ', elements["text"])
    
    # Clean up text (remove extra spaces)
    elements["text"] = ' '.join(elements["text"].split())
    # if verbose:
    #     print("Extracted elements:", elements)
    
    return elements

In [25]:
def is_image_file(path):
    """Check if path is an image file."""
    try:
        return path.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp'))
    except:
        return False

def is_audio_file(path):
    """Check if path is an audio file."""
    try:
        return path.lower().endswith(('.wav', '.mp3', '.ogg', '.aac', '.flac'))
    except:
        return False

def is_video_file(path):
    """Check if path is a video file."""
    try:
        return path.lower().endswith(('.mp4', '.avi', '.mov', '.wmv', '.mkv'))
    except:
        return False
def prepare_inputs(conversation=None, elements=None):
        prompt_template = {
            'role':'user',
            'content':[
                {'type':'text', 'text':elements['text']}
            ]
        }

        # Add images to prompt
        for image in elements['images']:
            if is_image_file(image):
                prompt_template['content'].append({'type':'image', 'image':image})

        # Add audio to prompt
        for audio in elements['audio']:
            if is_audio_file(audio):
                prompt_template['content'].append({'type':'audio', 'audio':audio})

        # Add video to prompt
        for video in elements['video']:
            if is_video_file(video):
                prompt_template['content'].append({'type':'video', 'image':video})
        
        conversation.append(prompt_template)
        text = processor.apply_chat_template(conversation, add_generation_prompt=add_generation_prompt, tokenize=tokenize)
        audios, images, videos = process_mm_info(conversation, use_audio_in_video=use_audio_in_video)
        inputs = processor(text=text, audio=audios, images=images, videos=videos, return_tensors="pt", padding=True, use_audio_in_video=use_audio_in_video)
        # Move inputs to model dtype and device
        inputs = inputs.to(model.device).to(model.dtype)
        return conversation, inputs

# Define chat

In [28]:
def chat():
    conversation = [
            {
                "role": "system",
                "content": [
                    {"type": "text", "text": "You are Qwen, a virtual human developed by the Qwen Team, Alibaba Group, capable of perceiving auditory and visual inputs, as well as generating text and speech."}
                ]
            }
        ]
    # Create chat interface
    prompt_widget = widgets.Text(
        value='',
        placeholder='Type your prompt (e.g., Describe image1.jpg and image2.jpg)',
        description='You:',
        layout={'width': '500px'}
    )
    submit_button = widgets.Button(
        description='Submit',
        button_style='primary',
        tooltip='Click to submit prompt'
    )

    image_upload = widgets.FileUpload(
        accept='.jpg,.jpeg,.png,.gif,.bmp',
        multiple=False,
        description='Image'
    )

    video_upload = widgets.FileUpload(
        accept='.mp4',
        multiple=False,
        description='Video'
    )
    output = widgets.Output()

    def on_submit(button):
        nonlocal conversation
        with output:
            prompt = prompt_widget.value.strip()
            if prompt.lower() == 'exit':
                print("Chat ended.")
                return
            prompt_widget.value = ''  # Clear input after submission
            
            uploaded_files = {'image': image_upload.value, 'video': video_upload.value}
            elements = extract_prompt_elements(prompt, uploaded_files, verbose=True)
            conversation, inputs = prepare_inputs(conversation, elements)
            image_upload.value = ()
            video_upload.value = ()

            try:
                text_ids, audio = model.generate(**inputs, use_audio_in_video=use_audio_in_video, speaker=speaker)
            except Exception as e:
                print(f"Inference failed: {e}")
                return

            text = processor.batch_decode(text_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)
            response = text[0].split('assistant\n')[-1]
            conversation.append({
                'role':'system',
                'content':[{'type':'text','text':response}]
            })
            print(f'User: {prompt}')
            print(f'Assistant: {response}')
            os.makedirs('.generated_audio', exist_ok=True)
            output_audio = ".generated_audio/response.wav"
            if audio is not None:
                sf.write(
                    output_audio,
                    audio.reshape(-1).detach().cpu().numpy(),
                    samplerate=24000
                )

            if audio is not None:
                audio_data, sample_rate = sf.read(output_audio)
                audio_duration = len(audio_data) / sample_rate
                display(Audio(output_audio, autoplay=True))
                time.sleep(audio_duration + 0.5)

    submit_button.on_click(on_submit)
    display(widgets.VBox([image_upload, video_upload,prompt_widget, submit_button, output]))

In [30]:
chat()

VBox(children=(FileUpload(value=(), accept='.jpg,.jpeg,.png,.gif,.bmp', description='Image'), FileUpload(value…