In [1]:

import shutil
import subprocess
import numpy as np
import cv2
import os
import torch
import torchaudio
def normalize_volumes(volumes):
    """
    Normalizes the volumes to the range [0, 1] for each volume independently in the batch across all frames.

    Args:
        volumes: Tensor of shape (B, T, H, W) representing the volumes.

    Returns:
        normalized_volumes: Tensor of the same shape as input, normalized to [0, 1].
    """
    # Compute min and max for each volume independently (across all T, H, W)
    min_vals = volumes.reshape(volumes.shape[0], -1).min(dim=1, keepdim=True)[0]  # Shape: [B, 1]
    max_vals = volumes.reshape(volumes.shape[0], -1).max(dim=1, keepdim=True)[0]  # Shape: [B, 1]

    # Reshape for broadcasting
    min_vals = min_vals.view(-1, 1, 1, 1)  # Shape: [B, 1, 1, 1]
    max_vals = max_vals.view(-1, 1, 1, 1)  # Shape: [B, 1, 1, 1]

    # Handle edge case where min equals max (flat volume)
    epsilon = 1e-8
    divisor = torch.clamp(max_vals - min_vals, min=epsilon)

    # Normalize each volume to [0, 1] range (across all frames)
    normalized_volumes = (volumes - min_vals) / divisor
    return normalized_volumes

class MatchmapVideoGeneratorDenseAV:
    def __init__(self,model, device, img, audio_path,  matchmap_path = None):
        self.model = model
        self.device = device
        self.audio_waveform, self.sample_rate = torchaudio.load(audio_path)
        self.image = img.unsqueeze(0).to(device)  # Add batch dimension and move to device
        #image [b,c,h,w] but for DenseAV it is[k,b,c,h,w] k for head

        if matchmap_path is not None:
            self.matchmap = torch.load(matchmap_path, map_location=device)
        else:
            self.matchmap = None
        self.audio_length = self.audio_waveform.shape[1] / self.sample_rate
        print(f"Audio length in seconds: {self.audio_length:.2f}")
        self.video = None


    def compute_matchmap(self):
        with torch.no_grad():
            aud_feats = self.model.forward_audio({"audio": self.audio_waveform.cpu()})
            aud_feats = {k: v.cpu() for k, v in aud_feats.items()}
            print(f"Extracted audio_feats['audio_feats'] tensor shape: {aud_feats['audio_feats'].shape}")

            img_feats = self.model.forward_image({"frames": self.image.unsqueeze(0).cpu()}, max_batch_size=1)
            img_feats = {k: v.cpu() for k,v in img_feats.items()}
            print(f"Extracted image_feats['image_feats'] tensor shape: {img_feats['image_feats'].shape}")

            aud_emb = aud_feats['audio_feats'].squeeze(2).squeeze(0)  # [c, t]
            img_emb = img_feats['image_feats'].squeeze(0)
            self.matchmap = torch.einsum('ct, chw -> thw',aud_emb,img_emb)
            print(f"Computed matchmap tensor shape: {self.matchmap.shape}")
            print(f"Number of simframes/sec : {self.matchmap.shape[0] / self.audio_length:.2f}")
        return self.matchmap
    
    def normalize_img(self, value, vmax=None, vmin=None):
        '''
        Normalize heatmap
        '''
        vmin = value.min() if vmin is None else vmin
        vmax = value.max() if vmax is None else vmax
        if not (vmax - vmin) == 0:
            value = (value - vmin) / (vmax - vmin)  # vmin..vmax
        return value
    
    def get_frame_match(self, img_np, matchmap_np, frame_idx):
        assert img_np.ndim == 3, "img_np should be a 3D numpy array"
        assert matchmap_np.ndim == 3, "matchmap_np should be a 3D numpy array"

        matchmap_i = matchmap_np[frame_idx]
        matchmap_i = cv2.resize(matchmap_i, dsize=(224, 224), interpolation=cv2.INTER_LINEAR)
        # if self.args.normalize_volumes_thw:
        #     pass
        # else:
        #     matchmap_i = self.normalize_img(matchmap_i)
        matchmap_i_photo = (matchmap_i * 255).astype(np.uint8)
        matchmap_i_photo = cv2.applyColorMap(matchmap_i_photo, cv2.COLORMAP_JET)
        matchmap_i_photo = cv2.addWeighted(matchmap_i_photo, 0.5, img_np, 0.5, 0)
        return matchmap_i_photo
    
    def create_video_f(self,img_np, matchmap_np, output_path="matchmap_video.mp4", fps=1):
        n_frames = matchmap_np.shape[0]
        
        # Make sure img_np is in uint8 format
        if img_np.dtype != np.uint8:
            img_np = (img_np * 255).astype(np.uint8)
        
        # Make sure img_np has correct dimensions (224, 224, 3)
        if img_np.shape[:2] != (224, 224):
            img_np = cv2.resize(img_np, (224, 224))
        
        # Use proper codec for compatibility
        # For better compatibility, try 'avc1' or 'H264' instead of 'mp4v'
        fourcc = cv2.VideoWriter_fourcc(*'MP4V') 
        
        # Alternative codec options if 'avc1' doesn't work:
        # fourcc = cv2.VideoWriter_fourcc(*'H264')
        # fourcc = cv2.VideoWriter_fourcc(*'XVID')  # More compatible but lower quality
        
        out = cv2.VideoWriter(output_path, fourcc, fps, (224, 224))
        
        if not out.isOpened():
            print("Failed to create VideoWriter. Trying alternative codec...")
            # Try with different codec
            fourcc = cv2.VideoWriter_fourcc(*'XVID')
            out = cv2.VideoWriter(output_path.replace('.mp4', '.avi'), fourcc, fps, (224, 224))
        
        for i in range(n_frames):
            frame = self.get_frame_match(img_np, matchmap_np, i)
            
            # Ensure frame is the correct format
            if frame.dtype != np.uint8:
                frame = (frame * 255).astype(np.uint8)
                
            # Ensure frame has the right shape
            if frame.shape[:2] != (224, 224):
                frame = cv2.resize(frame, (224, 224))
                
            # Verify frame is BGR (OpenCV's default format)
            if len(frame.shape) == 3 and frame.shape[2] == 3:
                out.write(frame)
            else:
                print(f"Warning: Frame {i} has incorrect format. Shape: {frame.shape}")
        
        out.release()
        print(f"Video created at: {output_path}")
        
        # Verify the file was created and has a non-zero size
        if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
            print(f"Success! Video file created: {os.path.getsize(output_path)} bytes")
        else:
            print("Error: Video file was not created properly")
            

    def create_video(self,output_path):
        img_np = self.image[0].cpu().numpy()
        img_np = np.transpose(img_np, (1, 2, 0))
        img_np = self.normalize_img(img_np)
        img_np = (img_np * 255).astype(np.uint8)
        img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR)

        if self.matchmap is None:
            self.matchmap = self.compute_matchmap()
        
        # if self.args.normalize_volumes_thw:
        self.matchmap = normalize_volumes(self.matchmap.unsqueeze(0))
        self.matchmap = self.matchmap.squeeze(0)

        matchmap_np = self.matchmap.cpu().numpy()
        n_frames = matchmap_np.shape[0]
        self.create_video_f(img_np, matchmap_np, output_path, fps=n_frames/ self.audio_length) 


    def add_audio_to_video(self, video_path, audio_path):
        """
        Add audio to video using ffmpeg and overwrite the output file if it exists.
        """
        temp_output = "temp_output.mp4"
        
        # Ensure the audio file exists
        if not os.path.exists(audio_path):
            raise Exception("Error: Audio file not found.")

        # Use ffmpeg to merge audio and video
        command = [
            "ffmpeg",
            "-y",  # Overwrite output files without asking
            "-i", video_path,  # Input video
            # "-stream_loop", "-1",  # Infinite loop for audio
            "-i", audio_path,  # Input audio
            "-map", "0:v",  # Video stream from first input
            "-map", "1:a",  # Audio stream from second input
            "-c:v", "copy",  # Copy video codec (no re-encoding)
            "-c:a", "libmp3lame",  # Encode audio in mp3 format
            temp_output
        ]
        
        try:
            subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, check=True)
            if os.path.exists(temp_output):  # Ensure the temporary file exists
                os.remove(video_path)  # Delete the original video file
                shutil.move(temp_output, video_path)  # Rename the temporary file
            else:
                raise Exception("Error: Temporary output file not created.")
        except subprocess.CalledProcessError as e:
            raise Exception(f"Error during ffmpeg execution: {e.stderr.decode('utf-8')}")

    def create_video_with_audio(self, output_path, audio_path):
        self.create_video(output_path)
        self.add_audio_to_video(output_path, audio_path)

In [2]:
import torch
import utils.util as u

import numpy as np

json_file = os.path.expandvars('$DATA/PlacesAudio_400k_distro/metadata/val.json')
gs = u.GetSampleFromJson(json_file, local_dir="/home/asantos/code/DenseAV/denseav.egg-info", padvalue=0)
image_path, audio_path = gs.get_sample(23)
image = gs.load_image(image_path)
print(f"image tensor shape: {image.shape}")

#use the class to compute matchmap
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

matchmap_generator = MatchmapVideoGeneratorDenseAV(model=None, device=device, img=image, audio_path=audio_path, matchmap_path="matchmap.pt")
matchmap_generator.create_video_with_audio(output_path="matchmap_denseav.mp4", audio_path=audio_path)

  from .autonotebook import tqdm as notebook_tqdm


image tensor shape: torch.Size([3, 224, 224])
Using device: cpu
Audio length in seconds: 7.89


OpenCV: FFMPEG: tag 0x5634504d/'MP4V' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


Video created at: matchmap_denseav.mp4
Success! Video file created: 233274 bytes
