<a href="https://colab.research.google.com/github/UiinKim/Music_Generation/blob/main/audio_transform(audiolm_autoencoder).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install audioldm



In [2]:
#audiolm/variational_autoencoder/autoencoder

import torch
from audioldm.latent_diffusion.ema import *
from audioldm.variational_autoencoder.modules import Encoder, Decoder
from audioldm.variational_autoencoder.distributions import DiagonalGaussianDistribution

from audioldm.hifigan.utilities import get_vocoder, vocoder_infer

class AutocoderKL(nn.Module):
    def __init__(
        self,
        ddconfig=None, #인코더, 디코더의 설정 정보
        lossconfig=None, #손실함수 설정 정보
        image_key='fbank', #입력 데이터의 유형 설정. fbank(필터뱅크),stft(단시간 푸리에 변환) --> 바꿔보기
        embed_dim=None, #latent space의 차원
        time_shuffle=1,
        subband=1, #subband 분해를 제어, 다양한 주파수 대역을 분리하여 처리 가능
        ckpt_path=None, #모델 가중치를 로드하기 위한 체크포인트 파일 경로
        reload_from_ckpt=None,
        ignore_keys=[],
        colorize_nlabels=None,
        monitor=None,
        base_learning_rate=1e-5,
        scale_factor=1
  ):
        super().__init__()

        self.encoder=Encoder(**ddconfig)
        self.decoder=Decoder(**ddconfig)

        self.subband=int(subband)

        if self.subband>1:
          print("Use subband decomposition %s" % self.subband)

        #양자화 컨볼루션 : latent vecotr로 변환하기 위한 컨볼루션 레이어
        self.quant_conv=torch.nn.Conv2d(2*ddconfig['z_channels'],2*embed_dim,1) #encoder에서 잠재표현으로
        self.post_quant_conv=torch.nn.Conv2d(embed_dim, ddconfig["z_channels"], 1) #잠재표현에서 decoder로

        self.vocoder=get_vocoder(None, 'cpu') #HiFi-GAN을 사용하여 스펙트로그램과 같은 오디오 표현을 파형으로 변환하는 역할인 vocoder을 초기화
        self.embed_dim=embed_dim

        if monitor is not None: #모니터링 옵션 설정
          self.monitor=monitor

        self.time_shuffle=time_shuffle
        self.reload_from_ckpt=reload_from_ckpt
        self.reloaded=False
        self.mean, self.std=None, None

        self.scale_factor=scale_factor

    def encode(self, x):
      #x=self.time_shuffle_operation(x)
      x=self.freq_split_subband(x) #서브밴드 분해가 활성화일 경우, input audio feature을 개별 주파수 밴드로 나눈다
      h=self.encoder(x) #인코더로 input data 처리
      moments=self.quant_conv(h) #양자화 컨볼루션을 통해 가우시안 분포의 모멘트(평균과 로그 분산) 생성
      posterior=DiagonalGaussianDistribution(moments) #모멘트로부터 latent vector샘플링
      return posterior

    def decode(self, z):
      z=self.post_quant_conv(z) #양자화 후 컨볼루션으로 latent vector z를 디코더가 처리할 수 있는 형태로 변환
      dec=self.decoder(z) #디코더를 사용하여 latent space-> 원래 데이터로 복원
      dec=self.freq_merge_subband(dec) #subband 분해가 수행된 경우, 주파수 밴드를 병합하여 전체 주파수 표현을 복원
      return dec

    def decode_to_waveform(self, dec):
      dec=dec.squeeze(1).permute(0,2,1) #디코더의 output의 형태를 vocoder에 맞게 조정
      wav_reconstruction=vocoder_infer(dec, self.vocoder) #보코더를 사용하여 dec을 파형으로 변환
      return wav_reconstruction

    def forward(self, input, sample_posterior=True):
      posterior=self.encode(input) #input을 인코딩하여 posterior 분포를 얻는다
      #파라미터에 대한 확률 분포
      #잠재 벡터를 샘플링하거나 모드 값 사용
      if sample_posterior:
        z=posterior.sample() #posterior분포에 대해 표본을 생성
      else:
        z=posterior.mode() #확률 분포에서 가장 확률이 높은 값

      if self.flag_first_run: #초기데이터 불러러오기
        print("Latent size: ", z.size())
        self.flag_first_run=False

      dec=self.decode(z) #잠재 벡터를 디코딩하여 데이터 복원

      return dec, posterior #복원 데이터와 posterior 분포 반환

    def freq_split_subband(self, fbank): #fbank(stft와 같은 스펙트로그램을 나타내는 텐서)를 입력받아 특정 조건에 따라 여러 개의 subband로 나누어 반환
      if self.subband==1 or self.image_key != "stft": #subband 처리가 필요한지 확인 -> 1이면 subband로 나눌 필요가 없으며 stft가 아닌 경우에도 필요 없음
      #self.subband는 subband의 수, image_key는 신호 처리의 유형
        return fbank

      bs, ch, tstep, fbins=fbank.size() #fbank의 각 텐서의 크기 반환
      #batch_size, channels, timesteps, frequency bins

      assert fbank.size(-1) % self.subband==0 #subband로 나누기 위해서는 frequency bins의 수가 subband의 수로 균등하게 나눠져야 함
      assert ch == 1 #이 코드는 단일 채널 input에 대해서만 작동

      return (
          fbank.squeeze(1).reshape(bs, tstep, self.subbnad, fbins // self.subband).permute(0, 2, 1, 3)
          #chnnels의 차원이 1일때 해당 차원을 제거([bs, 1, tsteps, fbins]->[bs, tsteps, fbins])
          #.reshape로 fbank를 재구성하여 [bs, tstep, fbins] 형태의 텐서를 [bs, tsetp, self.subband, fbins//self.subband]로 재구성
          #.premute(0,2,1,3)로 차원 순서 변경하여 [bs, tstep, self.subband, fbins//self.subband]로 변환하여  배치와 subband 별로 구분된 데이터로 만들기
      )

    def freq_merge_subband(self, subband_fbank):
      if self.subband==1 or self.image_key != 'stft':
        return subband_fbank

      assert subband_fbank.size(1)==self.subband #channel dimension. self.subband와 크기(subband의 개수)가 같아야 함
      bs, sub_ch, tstep, fbins = subband_fbank.size() #각 fbank의 각 텐서 크기 반환
      return subband_fbank.permute(0, 2, 1, 3).reshape(bs, tstep, -1).unsqueeze(1)
      #다시 순서를 [bs, tstep, sub_ch, fbins]로 변경
      #.reshape으로 sub_ch과 fbins를 곱하여 sub_ch*fbins로 나타나짐(subband가 하나의 큰 주파수로 합쳐짐)
      #.unsqueeze로 새로운 차원을 추가하여 [bs, 1, tstep, sub_ch*fbins]로 만들어 4차원 텐서로 다시 반환 -> 입력 텐서와 같은 형태

    #모델의 파라미터가 위치한 디바이스(gpu or cpu)를 반환
    def device(self):
      return next(self.parameters()).device

    @torch.no_grad() #이 함수 내에서 실행되는 모든 연산에 대해 gradient를 계산하지 않도록 (모델의 추론단계 사용)
    #인코딩 method의 gradient가 없는 버전으로 inference할때 gradient 사용 x
    def encode_first_stage(self, x):
      return self.encode(x)

    @torch.no_grad()
    def decode_first_stage(self, z, predict_cids=False, force_not_quantize=False):
    #z는 입력 텐서(모델의 디코딩 대상), predict_cids는 특정 단계 'z'를 처리할지 여부를 결정, force_not_quantize는 양자화를 강제로 비활성화할지 여부
      if predict_cids: #z를 처리하는 경우
        if z.dim() == 4: #z가 4차원의 텐서인지 확인
          z = torch.argmax(z.exp(), dim=1).long() #z의 지수함수에서 가장 큰 값을 가지는 인덱스를 반환하여 long타입으로 변환
        z = self.first_stage_model.quantize.get_codebook_entry(z, shape=None) #z를 양자화된 코드북으로 변환 (벡터 양자화)
        z = z.permute(0, 3, 1, 2).contiguous() #z의 차원을 재배열하고 메모리 상에서 연속적인 형태로 만든다.

      z = 1.0 / self.scale_factor * z #z의 크기를 조정하는 과정, 모델의 학습 및 추론에서 사용된 스케일링을 되돌리기
      return self.decode(z) #z 디코딩

    def get_first_stage_encoding(self, encoder_posterior): #잠재변수 입력인 encoder_posterior의 유형에 따라 함수가 다르게 동작
      if isinstance(encoder_posterior, DiagonalGaussianDistribution): #가우시안 분포 타입인 경우
        z = encoder_posterior.sample() #sampling된 값을 z에 할당
      elif isinstance(encoder_posterior, torch.Tensor): #텐서인 경우
        z = encoder_posterior #입력 그대로를 사용
      else: #두 가지 외의 타입이면 에러 발생
        raise NotImplementedError(
            f"encoder_posterior of type '{type(encoder_posterior)}' not yet implemented"
        )
      return self.scale_factor * z #z를 스케일링(잠재 변수 크기 조정)

In [4]:
#audiolm/variational_autoencoder/distributions
import torch
import numpy as np

class AbstractDistribution:
  def sample(self): #샘플 생성 기능, 다른 클래스 상속
    raise NotImplementedError()

  def mode(self): #가장 가능성이 높은 값을 반환, 다른 클래스 상속
    raise NotImplementedError()


class DiracDistribution(AbstractDistribution): #샘플링이 항상 같은 값 반환
  def __init__(self, value):
    self.value=value

  def sample(self):
    return self.value

  def mode(self):
    return self.value


class DiagonalGaussianDistribution(object): #가우시안 분포의 대각선 형태 -> VAE or 다른 베이지안 모델에서 잠재 변수 분포 모델링
  def __init__(self, parameters, deterministic=False):
  #parameters : 가우시안 분포의 평균과 분산 계산에 사용
  #deterministic : True로 설정되면 무작위성을 제거하고 항상 동일한 결과 반환
    self.parameters = parameters
    self.mean, self.logvar = torch.chunk(parameters, 2, dim=1) #parameter을 mean과 logvar 2개로 분리, dim=1은 텐서의 두번째 차원에서 나눔
    self.logvar = torch.clamp(self.logvar, -30.0, 20.0) #logvar은 로그 스케일에서의 분산을 의미 -> 이 값을 통해 분산과 표준편차를 계산할 수 있음
    #로그 분산 값이 너무 작거나 커지는 것을 방지(극단적인 값)
    self.deterministic = deterministic #determinisitc 상속
    self.std = torch.exp(0.5 * self.logvar) # 0.5*self.logvar로 로그 스케일에서 표준편차에 해당하는 값을 계산하고 torch.exp를 적용하여 원래 스케일로 변환
    self.var = torch.exp(self.logvar) #로그 분산을 지수함수로 변환하여 분산 값 계산
    if self.dterministic: #True일 경우 분산(self.var)과 표준편차(self.std)를 0으로 설정
      self.var = self.std = torch.zeros_like(self.mean).to( #mean과 동일한 크기의 텐서를 0으로 채워 생성, parameter과 동일한 디바이스에 저장 -> 무작위성 제거, 샘플링시 평균값만 반환
          device=self.parameters.device
      )

  def sample(self): #가우시안 분포의 평균과 표준편차로 무작위 샘플 생성 -> 잠재변수 샘플링(VAE 모델에서의)에 사용됨
    x = self.mean + self.std * torch.randn(self.mean.shape).to(
        #torch.randn(self.mean.shape)은 평균이 0이고 분산이 1인 표준 정규 분포에서 랜덤 샘플 생성 -> 분포의 표준 편차로와 곱하여 조정하고 평균을 더하여 원하는 가우시안 분포에서의 샘플 생성
        device=self.parameters.device
    )
    return x

  def kl(self, other=None):
    if self.deterministic: #determinisitic이 True이면 무작위성이 없는 결정론적 분포
      return torch.Tensor([0.0]) #KL발산은 0으로 설정
    else:
      if other is None: #deterministic이 False이고 other이 None일 경우
        return 0.5*torch.mean(
            torch.pow(self.mean, 2)+self.var-1.0-self.logvar,
            #torch.pow(self.mean, 2) : 평균의 제곱을 하여 표준 정규 분포와 비교할 때 현재 분포의 중심에서 얼마나 벗어나있는지 측정
            #현재 분포의 분산에서 표준 정규의 분산인 1을 빼고 로그 분산을 빼서 비슷한 스케일로 표현
            dim=[1,2,3],
            #이후 각 0차원을 제외한 1차원, 2차원, 3차원에서의 평균을 계산하여 스칼라 값이 된다.
        ) #최종적으로 KL발산에 0.5를 곱하여 반환
      else:
        return 0.5*torch.mean(
            torch.pow(self.mean-other.mean, 2) / other.var #현재 분포와 other의 분포의 평균의 차이를'other'의 분산으로 나누어 정규화 -> 두 분포의 중심이 얼마나 떨어져 있는지 나타냄
            + self.var / other.var #현재 분포의 분산을 'other' 분포의 분산으로 나누어 두 분포의 스케일 차이를 반영
            - 1.0 #KL발산의 공식에서 등장하는 상수(표준 정규 분포의 분산인 1을 빼준다)
            - self.logvar + other.logvar, #두 분포의 로그 분산의 차이를 반영. 로그 분산 차이는 분포의 스케일(분산, 표준편차) 차이를 반영
            dim=[1,2,3],  #결과의 평균을 계산하여 KL 발산 값을 배치 차원과 공간 차원에서 평균화
        ) #최종적으로 KL발산 값에 0.5를 곱하여 반환

  def nll(self, sample, dims=[1,2,3]): #가우시안 분포에서의 Negative Log-Likelihood 계산
  #NLL은 주어진 데이터가 특정 분포에 따라 관찰될 가능성의 대칭적인 척도 -> 주어진 샘플이 해당 분포로부터 얼마나 가능성이 있는지 측정하는 함수
  #sample : NLL을 계산할 샘플(데이터)이며 self.mean과 self.var로 정의된 가우시안 분포에 대해 평가됨
  #dims=[1,2,3]은 4차원 텐서에서 첫번째 차원을 제외한 모든 차원
    if self.determinisitic: #deterministic이 True일 경우 NLL을 0으로 반환
      return torch.Tensor([0.0])
    logtwopi=np.log(2.0 * np.pi) #log(2ㅠ)를 계산한 값 -> 가우시안 분포의 NLL 계산에서 상수항으로 사용
    return 0.5 * torch.sum(
        logtwopi + self.logvar + torch.pow(sample - self.mean, 2) / self.var,
        #log(2ㅠ)상수항 + 현재 분포의 로그분산 + 샘플과 분포의 평균 사이의 제곱 오차를 분산으로 나눈 값
        dim=dims, #1차원인 배치 차원을 제외한 각 차원에 대해 NLL 값을 계산하여 [batch_size]크기의 텐서를 반환 -> batch_size개의 3개의 NLL 값
    ) #0.5를 곱하여 가우시안 분포의 NLL식을 완성

  def mode(self):
    return self.mean


#두 개의 가우시안 분포 사이의 KL발산을 계산(두 확률 분포 간의 차이를 측정)
def normal_kl(mean1, logvar1, mean2, logvar2): #첫번째 가우시안 분포의 평균과 로그분산, 두번째 가우시안 분포의 평균과 로그분산
  """
  source : https://github.com/openai/guided-diffusion/blob/27c20a8fab9cb472df5d6bdd6c8d11c8f430b924/guided_diffusion/losses.py#L12
  Compute the KL divergence between two gaussians.
  Shapes are automatically broadcasted, so batches can be compared to
  scalars, among other use cases.
  """
  #이 함수는 두 가우시안 분포 간의 KL 발산을 계산하여, 그 값을 반환합니다. 로그 분산이 제공된다는 점에서, 이 함수는 로그 스케일에서 분산을 다루고 있습니다.

  #네 개의 매개변수(mean1, logvar1, mean2, logvar2) 중 하나가 반드시 텐서 객체인지 확인하는 코드
  tensor = None #초기에는 None으로 설정
  for obj in (mean1, logvar1, mean2, logvar2):
    if isinstance(obj, torch.Tensor): #텐서 객체가 발견되면 그 객체를 tensor 변수에 할당
      tensor=obj
      break
  assert tensor is not None, "at least one argument must be a Tensor" #최소 하나의 input이 텐서여야 하며 아니면 에러 출력

  # Force variances to be Tensors. Broadcasting helps convert scalars to
  # Tensors, but it does not work for torch.exp().
  logvar1, logvar2 = [
      x if isinstance(x, torch.Tensor) else torch.tensor(x).to(tensor) for x in (logvar1, logvar2)
      #logvar1, logvar2가 텐서가 아닌 경우 텐서로 변환하고 tensor 변수가 있는 동일한 장치(gpu, cpu)로 이동
  ]

  return 0.5 * ( #두 가우시안 분포 사이의 KL발산을 계산하는 수식
      - 1.0
      + logvar2
      - logvar1 #로그 분산 비율을 계산
      + torch.exp(logvar1 - logvar2) #두 분포의 분산 비율을 계산
      + ((mean1 - mean2) ** 2) * torch.exp(-logvar2) #두 분포의 평균 차이에 대한 비율을 계산
  )