In [None]:
pip install torch opencv-python numpy moviepy together openai-whisper

In [None]:
import os
import random
import cv2
import numpy as np
import librosa
from moviepy.editor import VideoFileClip
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

Sử dụng API LLama 3.2 để trích xuất đặc trưng từ video

In [None]:
import torch
from together import Together
import cv2
from PIL import Image
import whisper
from moviepy.editor import VideoFileClip
import librosa
import numpy as np
import json
import base64
from io import BytesIO

# Initialize Together client
client = Together(api_key="")

def convert_image_to_base64(image):
    """
    Convert PIL Image to base64 string.
    """
    if image.mode != 'RGB':
        image = image.convert('RGB')

    buffer = BytesIO()
    image.save(buffer, format='JPEG', quality=85)  # Save with high efficiency
    return base64.b64encode(buffer.getvalue()).decode('utf-8')

def call_together_ai_api(image, query):
    """
    Sends an image to TogetherAI API for caption generation.
    """
    base64_image = convert_image_to_base64(image)
    try:
        response = client.chat.completions.create(
            model="meta-llama/Llama-3.2-90B-Vision-Instruct-Turbo",
            messages=[
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": query},
                        {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
                    ]
                }
            ],
            max_tokens=300,
        )
        return response.choices[0].message.content.strip()
    except Exception as e:
        return f"Error generating caption: {str(e)}"

# Function to interpret acoustic features into natural language description
def interpret_acoustic_features(y, sr):
    """
    Phân tích và diễn giải các đặc trưng âm thanh thành mô tả ngôn ngữ tự nhiên.
    
    Args:
        y (np.ndarray): Tín hiệu âm thanh
        sr (int): Tần số lấy mẫu
        
    Returns:
        str: Mô tả bằng ngôn ngữ tự nhiên về đặc điểm âm thanh
    """
    if len(y) == 0 or len(y) < sr/10:
        return "đoạn âm thanh quá ngắn để phân tích"
        
    try:
        description = []
        
        # Chuẩn hóa âm thanh để đảm bảo tính nhất quán
        y = librosa.util.normalize(y)
        
        # Phân tích MFCC với window và hop length phù hợp
        n_fft = 2048
        hop_length = 512
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13, 
                                  n_fft=n_fft, hop_length=hop_length)
        mfcc_mean = np.mean(mfcc, axis=1)

        # Phân tích tông giọng với ngưỡng được tinh chỉnh
        if mfcc_mean[0] > 50:
            description.append("tông giọng cao")
        elif mfcc_mean[0] < -50:
            description.append("tông giọng trầm")
        else:
            description.append("tông giọng trung bình")

        # Phân tích ZCR với frame size phù hợp
        if len(y) >= n_fft:
            zcr = librosa.feature.zero_crossing_rate(y=y, 
                                                   frame_length=n_fft, 
                                                   hop_length=hop_length)
            zcr_mean = np.mean(zcr)
            if zcr_mean > 0.1:
                description.append("nói nhanh và mạnh")
            else:
                description.append("nói chậm rãi và êm dịu")

        # Phân tích năng lượng RMS với frame size nhất quán
        rms = librosa.feature.rms(y=y, frame_length=n_fft, hop_length=hop_length)
        rms_mean = np.mean(rms)
        if rms_mean > 0.1:  # Điều chỉnh ngưỡng sau khi chuẩn hóa
            description.append("nói lớn và mạnh mẽ")
        else:
            description.append("nói nhẹ nhàng")

        # Phân tích spectral centroid
        centroid = librosa.feature.spectral_centroid(y=y, sr=sr, 
                                                   n_fft=n_fft, 
                                                   hop_length=hop_length)
        centroid_mean = np.mean(centroid)
        # Điều chỉnh ngưỡng dựa trên tần số lấy mẫu
        centroid_threshold = sr / 6
        if centroid_mean > centroid_threshold:
            description.append("giọng sáng và rõ nét")
        else:
            description.append("giọng trầm và ấm áp")

        # Phân tích spectral bandwidth với parameters nhất quán
        bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr,
                                                     n_fft=n_fft,
                                                     hop_length=hop_length)
        bandwidth_mean = np.mean(bandwidth)
        # Điều chỉnh ngưỡng dựa trên tần số lấy mẫu
        bandwidth_threshold = sr / 8
        if bandwidth_mean > bandwidth_threshold:
            description.append("giọng sôi nổi và nhiều âm sắc")
        else:
            description.append("giọng đều đặn và ít biến động")
            
        # Thêm phân tích tempo
        tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
        if tempo > 120:
            description.append("nhịp độ nói nhanh")
        elif tempo < 80:
            description.append("nhịp độ nói chậm")
        else:
            description.append("nhịp độ nói vừa phải")

        return ", ".join(description)
        
    except Exception as e:
        return f"lỗi khi phân tích: {str(e)}"

def extract_frames(video_path, num_frames=10):
    """
    Extract evenly spaced frames from a video.
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError("Could not open video file.")
    
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if num_frames > total_frames:
        num_frames = total_frames

    frame_indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)
    frames = []
    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        success, frame = cap.read()
        if success:
            frames.append(frame)
    cap.release()
    return frames

def frames_to_pil(frames):
    """
    Convert OpenCV frames to PIL Image objects.
    """
    return [Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) for frame in frames]

def extract_audio(video_path, audio_path):
    """
    Extract audio from a video.
    """
    try:
        video = VideoFileClip(video_path)
        video.audio.write_audiofile(audio_path, fps=16000, nbytes=2, codec='pcm_s16le')
    finally:
        video.close()

# Function to transcribe audio using Whisper with optimized parameters for Vietnamese
def transcribe_audio(audio_path, model_name="large-v3", language="vi", task="transcribe", beam_size=5, best_of=5):
    """
    Transcribe audio using a specified Whisper model with optimized parameters for Vietnamese.

    Args:
        audio_path (str): Path to the audio file.
        model_name (str): The name of the Whisper model to use (e.g., "base", "small", "medium", "large").
        language (str): Language code for transcription (default: "vi" for Vietnamese).
        task (str): Task type, either "transcribe" (speech-to-text) or "translate" (speech-to-English text).
        beam_size (int): Beam search size for better decoding (default: 5).
        best_of (int): Number of best candidates to consider during decoding (default: 5).

    Returns:
        list: List of transcription segments.
    """
    try:
        # Load the specified Whisper model
        model = whisper.load_model(model_name)
        
        # Transcribe the audio with specified options
        result = model.transcribe(
            audio_path,
            language=language,
            task=task,
            beam_size=beam_size,
            best_of=best_of,
            #fp16=False  # Disable FP16 if running on CPU to prevent errors
        )
        return result["segments"]
    except Exception as e:
        raise ValueError(f"Error transcribing audio with Whisper model {model_name}: {str(e)}")


def extract_acoustic_features(audio_path, segments):
    """
    Extract acoustic features for each audio segment.
    """
    y, sr = librosa.load(audio_path, sr=16000)
    features = []
    for segment in segments:
        start = max(0, int(segment['start'] * sr))
        end = min(len(y), int(segment['end'] * sr))
        if start >= end:
            acoustic_description = "Invalid time range."
        else:
            audio_segment = y[start:end]
            acoustic_description = interpret_acoustic_features(audio_segment, sr)
        
        features.append({
            "start_time": segment['start'],
            "end_time": segment['end'],
            "utterance": segment['text'],
            "acoustic_features": acoustic_description
        })
    return features

Tổng hợp kết quả

In [None]:
def process_video(video_path, output_path, query="Describe this picture with a 15-word limit sentence. Ignore subtitles in the video. When describing, don’t use 'The image depicts,' start directly, e.g., 'a man in a red shirt...'"):
    """
    Process a video to extract audio, acoustic features, and frame captions.
    """
    audio_path = "temp_audio.wav"
    try:
        print("Extracting audio...")
        extract_audio(video_path, audio_path)

        print("Transcribing audio...")
        segments = transcribe_audio(audio_path)
        print(segments)

        print("Extracting acoustic features...")
        acoustic_features = extract_acoustic_features(audio_path, segments)

        print(f"Extracting {len(segments)} frames...")
        frames = extract_frames(video_path, num_frames=len(segments))
        pil_frames = frames_to_pil(frames)

        print("Generating visual descriptions...")
        frame_captions = [
            call_together_ai_api(frame, query) if frame else "Error generating caption"
            for frame in pil_frames
        ]

        results = [
            {
                "segment_id": i,
                "start_time": feature["start_time"],
                "end_time": feature["end_time"],
                "utterance": feature["utterance"],
                "acoustic_features": feature["acoustic_features"],
                "visual_description": caption
            }
            for i, (feature, caption) in enumerate(zip(acoustic_features, frame_captions))
        ]

        print(f"Saving results to {output_path}...")
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(results, f, ensure_ascii=False, indent=2)

        print("Processing complete!")
        return results

    except Exception as e:
        print(f"Error in process_video: {str(e)}")
        raise
    finally:
        if os.path.exists(audio_path):
            os.remove(audio_path)  # Clean up temporary file

Lưu kết quả thành file JSON

In [None]:
video_path = "/kaggle/input/temp-videos2/189 (2).mp4"  # Replace with your video path
output_path = "/kaggle/working/output.json"  # Replace with desired output path
result = process_video(video_path, output_path)
try:
    print(result)
except Exception as e:
    print(f"An error occurred during processing: {str(e)}")

# Example usage
results = []
for i in range(189, 190):
    video_path = f"/kaggle/input/videos/{i}.mp4"  # Replace with your video path
    output_path = f"/kaggle/working/output_{i}.json"  # Replace with desired output path
    result = process_video(video_path, output_path)
    results.append(result)
    try:
        print(result)
    except Exception as e:
        print(f"An error occurred during processing: {str(e)}")

!zip -r /kaggle/working/compressed_folder.zip /kaggle/working

In [None]:
import pandas as pd
df = pd.read_json("/kaggle/working/output.json")
df