# 데이터 증강

https://www.gyan.dev/ffmpeg/builds/ffmpeg-git-essentials.7z  
이 사이트를 통해 AudioSegment 파일 다운

In [None]:
# ogg 파일로 저장할때, 오류를 방지하기 위한 라이브러리
from pydub import AudioSegment
AudioSegment.converter = r"./ffmpeg-2024-07-10-git-1a86a7a48d-essentials_build/bin/ffmpeg.exe"
AudioSegment.ffmpeg = r"./ffmpeg-2024-07-10-git-1a86a7a48d-essentials_build/bin/ffmpeg.exe"
AudioSegment.ffprobe =r"./ffmpeg-2024-07-10-git-1a86a7a48d-essentials_build/bin/ffprobe.exe"

In [None]:
import os
import pandas as pd
from tqdm import tqdm
from tqdm.auto import trange
import random 
import librosa
import numpy as np
import soundfile as sf
import io
import copy
import csv
from scipy import signal
from scipy.signal import chirp

In [None]:
# 오디오 파일 로드
def load_audio(file_path, sr=None):
    y, sr = librosa.load(file_path, sr=sr)  # Preserve the original sample rate
    return y, sr


# 오디오 저장 (WAV로 저장 후 OGG로 변환하고, WAV 파일 삭제)
def save_audio(file_path, audio, sr):
    # NaN 값이나 무한대 값 제거
    audio = np.nan_to_num(audio)
    #sf.write(file_path, audio, sr, format='ogg') # 빠르지만 오류 발생
    
    # NumPy 배열을 int16으로 변환
    audio_int16 = (audio * 32767).astype(np.int16)
    
    # BytesIO 객체 생성
    byte_io = io.BytesIO()
    
    # WAV 파일로 먼저 저장 (메모리에)
    AudioSegment(
        audio_int16.tobytes(),
        frame_rate=sr,
        sample_width=2,
        channels=1
    ).export(byte_io, format="wav")
    
    # WAV를 OGG로 변환
    byte_io.seek(0)
    wav_audio = AudioSegment.from_wav(byte_io)
    wav_audio.export(file_path, format="ogg")
    

# 증강 적용 여부 결정
def should_apply(probability=0.5):
    return random.random() < probability

In [None]:
error_files = []

# CSV 파일 로드
file_path = './train.csv'
data = pd.read_csv(file_path)

# 증강 작업 수행
for index, row in tqdm(data.iterrows(), total=data.shape[0], desc="Processing files"):
    file_path = row['path']
    new_file_name = row['id'] + '_a1.ogg'
    output_path = os.path.join('./train_augmented1/', new_file_name)
    # train_augmentted1: nosie 및 음량 조절 추가 => 데이터 증강 
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    
    if not os.path.exists(file_path):
        print(f"File {file_path} does not exist. Skipping.")
        continue
    
    # 오디오 로드
    audio, sr = load_audio(file_path)

    target_samples = 5 * sr
    if len(audio) < target_samples:
        padding_length = target_samples - len(audio)
        if random.choice([True, False]):
            # 앞에 공백 추가
            audio = np.concatenate((np.zeros(padding_length), audio))
        else:
            # 뒤에 공백 추가
            audio = np.concatenate((audio, np.zeros(padding_length)))
    
    try:
        # 증강된 오디오 저장
        save_audio(output_path, audio, sr)
    except Exception as e: 
        error_files.append(file_path)
        print(f"Error processing {file_path}: {e}")

print("Finish.") 

In [None]:
# Load the CSV file
file_path = './train.csv'
df = pd.read_csv(file_path)

# Make a copy of the dataframe and modify the id and path columns
df_copy = df[~df['path'].isin(error_files)].copy()
df_copy['id'] = df_copy['id'] + '_a1'
df_copy['path'] = df_copy['path'].apply(lambda x: x.replace('./train/', './train_augmented1/').replace('.ogg', '_a1.ogg'))

# Display the first few rows of the modified dataframe
print(df_copy.head())

# Save the modified dataframe to a new CSV file
new_file_path = './train_augmented1.csv'
df_copy.to_csv(new_file_path, index=False)

print(f"Modified CSV file saved as: {new_file_path}")


# 2명 목소리 
1. 두 데이터를 합쳐 2개의 하나의 오디오 생성

In [None]:
# [기존 MixUp Code]
# 두 개의 오디오 파일을 합치는 함수
def mix_audios(audio1, audio2, sr, mix_ratio_range=(0.3, 0.7),target_length=5):
    mix_ratio = random.uniform(*mix_ratio_range)
    
    # 두 오디오의 길이를 맞춤
    max_len = max(len(audio1), len(audio2))
    audio1 = librosa.util.fix_length(audio1, size=max_len)
    audio2 = librosa.util.fix_length(audio2, size=max_len)
    
    # 오디오 믹싱
    mixed_audio = (audio1 * mix_ratio) + (audio2 * (1 - mix_ratio))
    # 최종 결과의 소리가 5초보다 짧으면 공백을 추가
    target_samples = target_length * sr
    if len(mixed_audio) < target_samples:
        padding_length = target_samples - len(mixed_audio)
        if random.choice([True, False]):
            # 앞에 공백 추가
            mixed_audio = np.concatenate((np.zeros(padding_length), mixed_audio))
        else:
            # 뒤에 공백 추가
            mixed_audio = np.concatenate((mixed_audio, np.zeros(padding_length)))
    return mixed_audio


# 파일 경로 설정
file_path2 = './train.csv'

new_data = []

# CSV 파일 로드
data = pd.read_csv(file_path2)

for index, row in tqdm(data.iterrows(), total=data.shape[0], desc="Processing files"):
    file_path = row['path']
    original_id = row['id']
    original_label = row['label']
    new_file_name = original_id + '_a3.ogg'
    output_path = os.path.join('./train_augmented3/', new_file_name)
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    
    if not os.path.exists(file_path):
        print(f"File {file_path} does not exist. Skipping.")
        continue
    
    # 기존 오디오 로드
    audio1, sr = load_audio(file_path)
    
    # 랜덤하게 두 번째 오디오 파일 선택 및 로드
    second_audio_row = data.sample(n=1).iloc[0]
    second_audio_path = second_audio_row['path']
    second_audio_id = second_audio_row['id']
    second_audio_label = second_audio_row['label']
    audio2, _ = load_audio(second_audio_path, sr=sr)
    
    # 두 오디오 믹싱
    mixed_audio = mix_audios(audio1, audio2, sr)    
    
    try:
        # 증강된 오디오 저장
        save_audio(output_path, mixed_audio, sr)
        
        # 새로운 라벨 생성
        if original_label == 'real' and second_audio_label == 'real':
            new_label = 'real'
        elif original_label == 'fake' and second_audio_label == 'fake':
            new_label = 'fake'
        else:
            new_label = 'fakereal'
        
        # 새로운 CSV 데이터에 추가
        new_data.append({
            'id': f"{original_id}_{second_audio_id}_a3",
            'path': output_path,
            'label': new_label
        })
        
    except Exception as e:
        print(f"Error processing {file_path}: {e}/n")

print("Audio processing finished.")

# 새로운 CSV 파일 생성
new_df = pd.DataFrame(new_data)
new_csv_path = './train_augmented3.csv'
new_df.to_csv(new_csv_path, index=False)
print(f"New CSV file created at: {new_csv_path}")

print("All processes completed.")

# [0,0] Label 생성


In [None]:
# 디렉토리 생성
output_dir = "./train_noise"
os.makedirs(output_dir, exist_ok=True)

# CSV 파일 생성 및 헤더 작성
csv_file = "train_noise.csv"
with open(csv_file, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(["id", "path", "label"])

def generate_noise(duration, fs):
    t = np.linspace(0, duration, int(duration * fs), False)
    noise_type = np.random.choice(['white', 'pink', 'brown', 'blue', 'violet', 'impulse', 'harmonic', 'ambient'])
    
    if noise_type == 'white':
        noise = np.random.normal(0, 1, int(duration * fs))
    elif noise_type == 'pink':
        noise = pink_noise(int(duration * fs))
    elif noise_type == 'brown':
        noise = brown_noise(int(duration * fs))
    elif noise_type == 'blue':
        noise = blue_noise(int(duration * fs))
    elif noise_type == 'violet':
        noise = violet_noise(int(duration * fs))
    elif noise_type == 'impulse':
        noise = impulse_noise(int(duration * fs))
    elif noise_type == 'harmonic':
        noise = harmonic_noise(t, fs)
    elif noise_type == 'ambient':
        noise = ambient_noise(int(duration * fs))
    
    # 랜덤 진폭 조절
    amplitude = np.random.uniform(0.1, 1.0)
    noise *= amplitude
    
    # 랜덤 진폭 변조
    if np.random.random() < 0.3:
        mod_freq = np.random.uniform(1, 10)
        noise *= (1 + 0.5 * np.sin(2 * np.pi * mod_freq * t))
    
    # 랜덤 클리핑
    if np.random.random() < 0.2:
        clip_level = np.random.uniform(0.5, 0.9)
        noise = np.clip(noise, -clip_level, clip_level)
    
    return noise

def pink_noise(size):
    x = np.random.randn(size)
    X = np.fft.rfft(x)
    S = np.sqrt(np.arange(X.size)+1.)
    y = np.fft.irfft(X/S, size)
    return y / y.std()

def brown_noise(size):
    x = np.random.randn(size)
    return np.cumsum(x) / size

def blue_noise(size):
    x = np.random.randn(size)
    X = np.fft.rfft(x)
    S = np.sqrt(np.arange(X.size))
    y = np.fft.irfft(X*S, size)
    return y / y.std()

def violet_noise(size):
    x = np.random.randn(size)
    X = np.fft.rfft(x)
    S = np.arange(X.size)
    y = np.fft.irfft(X*S, size)
    return y / y.std()

def impulse_noise(size):
    noise = np.zeros(size)
    num_impulses = np.random.randint(5, 50)
    impulse_positions = np.random.choice(size, num_impulses, replace=False)
    noise[impulse_positions] = np.random.uniform(-1, 1, num_impulses)
    return noise

def harmonic_noise(t, fs):
    num_harmonics = np.random.randint(3, 10)
    fundamental_freq = np.random.uniform(50, 500)
    noise = np.zeros_like(t)
    for i in range(1, num_harmonics + 1):
        amplitude = 1 / i
        noise += amplitude * np.sin(2 * np.pi * i * fundamental_freq * t)
    return noise / np.max(np.abs(noise))

def ambient_noise(size):
    noise = np.random.randn(size)
    for _ in range(np.random.randint(3, 8)):
        freq = np.random.uniform(100, 2000)
        q = np.random.uniform(1, 5)
        noise = apply_bandpass_filter(noise, freq, q, fs)
    return noise / np.max(np.abs(noise))

def apply_bandpass_filter(data, center_freq, q, fs):
    w0 = center_freq / (fs/2)
    b, a = signal.iirpeak(w0, q)
    return signal.lfilter(b, a, data)
def generate_chirp(duration, fs):
    t = np.linspace(0, duration, int(fs * duration), False)
    
    # 시작 주파수를 20Hz에서 500Hz 사이에서 랜덤하게 선택
    f0 = random.uniform(20, 500)
    
    # 종료 주파수를 시작 주파수보다 높게, 최대 4000Hz까지 랜덤하게 선택
    f1 = random.uniform(f0, min(4000, fs/2))
    
    # chirp 메서드를 랜덤하게 선택 (linear, quadratic, logarithmic, hyperbolic)
    method = random.choice(['linear', 'quadratic', 'logarithmic', 'hyperbolic'])
    
    return chirp(t, f0=f0, f1=f1, t1=duration, method=method)

def apply_mask(audio, fs, min_masks=1, max_masks=5, min_mask_duration=0.1, max_mask_duration=0.5):
    audio_length = len(audio)
    num_masks = random.randint(min_masks, max_masks)
    
    for _ in range(num_masks):
        mask_duration = random.uniform(min_mask_duration, max_mask_duration)
        mask_length = int(mask_duration * fs)
        
        # 마스크 시작 위치를 랜덤하게 선택 (오디오 끝에서 마스크 길이를 뺀 범위 내에서)
        start = random.randint(0, audio_length - mask_length)
        
        # 마스크 적용 (0으로 설정)
        audio[start:start+mask_length] = 0
    
    return audio

def save_audio_ogg(audio, fs, filename):
    sf.write(filename, audio, fs, format='ogg', subtype='vorbis')
    return filename

def generate_mixed_audio(duration, fs):
    # 기본 오디오 초기화 (무음)
    audio = np.zeros(int(duration * fs))
    
    # 무음 (이미 기본으로 적용되어 있음)
    if np.random.random() < 0.05:
        return audio
    
    # 노이즈
    if np.random.random() < 0.7:
        noise = generate_noise(duration, fs)
        audio += noise * np.random.uniform(0.1, 1.0)  # 랜덤 진폭으로 추가
    
    # 처프 신호
    if np.random.random() < 0.4:
        chirp = generate_chirp(duration, fs)
        audio += chirp * np.random.uniform(0.1, 1.0)  # 랜덤 진폭으로 추가
    
    # 정규화 (클리핑 방지)
    if np.max(np.abs(audio)) > 1:
        audio = audio / np.max(np.abs(audio))
    
    # 마스크 적용
    if np.random.random() < 0.6:
        audio = apply_mask(audio, fs, min_masks=1, max_masks=3, min_mask_duration=0.1, max_mask_duration=0.3)
    
    return audio

# 파일 생성 및 CSV 작성
fs = 16000  # 샘플링 레이트
duration = 5  # 오디오 길이 (초)
num_files = 100  # 생성할 파일 수
for i in trange(num_files):
    audio = generate_mixed_audio(duration, fs)
    
    filename = f"audio_{i+1:03d}.ogg"
    filepath = os.path.join(output_dir, filename)
    save_audio_ogg(audio, fs, filepath)
    
    # CSV에 데이터 추가
    with open(csv_file, 'a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow([filename, f"./train_noise/{filename}", "silence"])
