In [1]:
import json
import torch
import torchaudio
import torch.nn.functional as F
import os
import numpy as np
from torch.utils.data import Dataset
from torch.utils.data.sampler import BatchSampler

import librosa
import soundfile as sf

import random

class CLAPDataset(Dataset):
    def __init__(self, json_path, processor, sample_rate=48000, max_annotations=None):
        self.processor = processor
        self.sample_rate = sample_rate
        self.dirname = os.path.dirname(json_path)

        task_filter="audiocaption"

        # JSON 파일 로드
        with open(json_path, "r") as f:
            data = json.load(f)
        # task가 audiocaption인 annotation만 필터링
        filtered_annotations = [
            entry for entry in data["annotation"] if entry["task"] == task_filter
        ]
        self.annotations = filtered_annotations

        if max_annotations is not None:
            self.annotations = random.sample(filtered_annotations, min(len(filtered_annotations), max_annotations))

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        entry = self.annotations[idx]
        audio_path = os.path.join(self.dirname, entry["path"])
        text = entry["text"]

        # # 오디오 로드
        # audio, sr = sf.read(audio_path)
        # # 모노 채널로 변환 (스테레오인 경우)
        # if waveform.shape[0] > 1:  # Check if stereo
        #     audio = torch.mean(waveform, dim=0, keepdim=True) # 평균 내기
            
        # if sr != self.sample_rate:
        #     waveform = torchaudio.transforms.Resample(sr, self.sample_rate)(waveform)

        # librosa를 사용하여 오디오 로드 및 모노 채널 변환, resample
        waveform, sr = librosa.load(audio_path, sr=self.sample_rate, mono=True) # 한줄로 처리

        length = len(waveform)  # (채널, 길이) 중 길이
        
        waveform = torch.from_numpy(waveform).float() # tensor로 변환 및 차원 추가

        # 오디오 길이 저장 (패딩 최소화를 위해 사용)

        return {"path": audio_path, "waveform": waveform, "text": text, "length": length}

def pad_collate_fn(batch):
    # 배치 내 최대 길이 찾기
    max_length = max(map(lambda x: x["length"], batch))

    paths = []
    # 패딩 적용
    padded_waveforms = []
    texts = []
    lengths = []
    
    for item in batch:
        path = item["path"]
        waveform = item["waveform"]
        text = item["text"]
        length = item["length"]

        # 오른쪽 패딩 적용
        pad_amount = max_length - length
        padded_waveform = F.pad(waveform, (0, pad_amount))  # (채널, 길이)
        
        paths.append(path)
        padded_waveforms.append(padded_waveform)
        texts.append(text)
        lengths.append(length)

    # 텐서 변환
    padded_waveforms = torch.stack(padded_waveforms)
    return {"paths": paths, "waveforms": padded_waveforms, "texts": texts, "lengths": lengths}

class LengthBatchSampler(BatchSampler):
    """ 비슷한 길이끼리 배치하여 패딩을 최소화하는 샘플러 """
    def __init__(self, dataset, batch_size):
        sorted_indices = sorted(range(len(dataset)), key=lambda i: dataset[i]["length"])
        batches = [sorted_indices[i:i+batch_size] for i in range(0, len(sorted_indices), batch_size)]
        super().__init__(batches, batch_size, drop_last=False)

In [2]:
import os
import shutil

def recreate_output_folder(output_path):
    """
    output 폴더를 삭제하고 새로 생성합니다.

    Args:
        output_path (str): output 폴더 경로
    """
    try:
        # output 폴더가 존재하면 삭제
        if os.path.exists(output_path):
            shutil.rmtree(output_path)  # 하위 파일 및 폴더 모두 삭제
            print(f"기존 output 폴더({output_path})를 삭제했습니다.")

        # output 폴더 생성
        os.makedirs(output_path)
        print(f"새로운 output 폴더({output_path})를 생성했습니다.")

    except Exception as e:
        print(f"오류 발생: {e}")

# 사용 예시
output_path = "output"  # output 폴더 경로

In [3]:
from tqdm import tqdm

def process(device, model, processor, dataloader, similarity_threshold =0.5):
    results=[]
    # 배치 처리
    with torch.no_grad(), tqdm(total=len(dataloader), desc="Processing Batches") as pbar:  # tqdm 적용
        for batch in dataloader:
            paths = batch["paths"]
            waveforms = batch["waveforms"]  # (batch, 채널, 길이)
            texts = batch["texts"]
            lengths = batch.get("lengths") # lengths 정보 추가 (패딩 고려)

            # 오디오 전처리 (💡 배치 단위 변환)
            inputs = processor(text=texts, audios=waveforms, return_tensors="pt", sampling_rate=48000, padding=True)
            outputs = model(**inputs)

            logits_per_audio = outputs.logits_per_audio  # this is the audio-text similarity score
            print(logits_per_audio)
            # 배치 내의 각 샘플에 대해 결과 저장
            # for i in range(len(logits_per_audio)):
                
            #     if similarities[i] <= similarity_threshold:
            #         results.append({
            #             "similarity": similarities[i],
            #             "path": paths[i],
            #             "text": texts[i],
            #             "original_length": lengths[i] if lengths is not None else waveforms.shape[2] # 패딩 감안한 원래 길이
            #         })
            #         # 심볼릭 링크 생성 시 유사도 값 추가
            #         basename = os.path.basename(paths[i])
            #         similarity_str = f"{similarities[i]:.4f}"  # 유사도 값을 문자열로 변환 (소수점 4자리까지)
            #         text_str = f"{texts[i]}"
            #         target_link = os.path.join("output", f"{similarity_str}_{text_str}_{basename}") # 파일 이름에 유사도 추가

            #         if not os.path.exists(target_link):
            #             os.symlink(os.path.abspath(paths[i]), target_link)
            # pbar.update(1)
    return results

In [4]:
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from transformers import ClapModel, ClapProcessor

# 모델 및 프로세서 로드
device = "cuda" if torch.cuda.is_available() else "cpu"
model = ClapModel.from_pretrained("laion/clap-htsat-unfused").to(device).eval()
print("Clap Loaded")
processor = ClapProcessor.from_pretrained("laion/clap-htsat-unfused")
print("ClapProcessor Loaded")

  from .autonotebook import tqdm as notebook_tqdm


Clap Loaded
ClapProcessor Loaded


In [5]:
import pandas as pd
recreate_output_folder(output_path)

# 데이터셋 및 데이터로더 설정
dataset = CLAPDataset("/data/dataset/stage1_valid_10.json", processor, max_annotations=1000)
dataloader = DataLoader(dataset, collate_fn=pad_collate_fn, batch_size=8, num_workers=8)

print("DataLoader Loaded")
df = pd.DataFrame(process(device, model, processor, dataloader, similarity_threshold=0.1)).sort_values(by="similarity")

# 유사도 낮은 순으로 정렬
print(df.head())

# DataFrame 저장 (선택적으로 csv, pickle, 등 다른 형식으로 저장 가능)
df.to_csv("results.csv", index=False)  # csv 파일로 저장


기존 output 폴더(output)를 삭제했습니다.
새로운 output 폴더(output)를 생성했습니다.
DataLoader Loaded


Processing Batches:   0%|          | 0/125 [00:02<?, ?it/s]


ValueError: Unable to create tensor, you should probably activate truncation and/or padding with 'padding=True' 'truncation=True' to have batched tensors with the same length. Perhaps your features (`input_ids` in this case) have excessive nesting (inputs type `list` where type `int` is expected).