# CTGAN과 같은 모델에서 기본적으로 사용되는 기능들을 제공하는 함수들

In [None]:
"""BaseSynthesizer module."""

import contextlib
import numpy as np
import torch

@contextlib.contextmanager
def set_random_states(random_state, set_model_random_state):
    """랜덤 상태를 관리하기 위한 컨텍스트 매니저.
    
    인자:
        random_state (int 또는 tuple):
            랜덤 시드 혹은 (numpy.random.RandomState, torch.Generator) 튜플.
        set_model_random_state (function):
            모델의 랜덤 상태를 설정하는 함수.
    """
    # 기존 numpy와 torch의 랜덤 상태를 저장
    original_np_state = np.random.get_state()
    original_torch_state = torch.get_rng_state()

    # 전달받은 튜플에서 numpy와 torch의 랜덤 상태 객체를 분리
    random_np_state, random_torch_state = random_state

    # 전달받은 랜덤 상태를 적용 (각 객체의 내부 상태를 가져와서 설정)
    np.random.set_state(random_np_state.get_state())
    torch.set_rng_state(random_torch_state.get_state())

    try:
        yield  # 컨텍스트 블록 실행
    finally:
        # 컨텍스트 종료 시점에 현재의 numpy와 torch의 랜덤 상태를 새로 저장
        current_np_state = np.random.RandomState()
        current_np_state.set_state(np.random.get_state())
        current_torch_state = torch.Generator()
        current_torch_state.set_state(torch.get_rng_state())
        # 모델의 랜덤 상태를 업데이트: 현재의 상태를 모델에 반영
        set_model_random_state((current_np_state, current_torch_state))

        # 원래 저장해두었던 랜덤 상태로 복구
        np.random.set_state(original_np_state)
        torch.set_rng_state(original_torch_state)

def random_state(function):
    """함수를 호출하기 전에 랜덤 상태를 설정하는 데코레이터.
    
    인자:
        function (Callable):
            데코레이트할 함수.
    """
    def wrapper(self, *args, **kwargs):
        # 모델에 랜덤 상태가 설정되어 있지 않으면 그냥 함수 실행
        if self.random_states is None:
            return function(self, *args, **kwargs)
        else:
            # 모델에 설정된 랜덤 상태를 적용한 상태로 함수 실행
            with set_random_states(self.random_states, self.set_random_state):
                return function(self, *args, **kwargs)
    return wrapper

class BaseSynthesizer:
    """CTGAN의 기본 합성기(Base Synthesizer) 클래스.
    
    CTGAN의 모든 기본 합성기(데이터 생성기)의 공통 기능을 제공.
    """
    # 클래스 변수: 모델의 랜덤 상태를 저장 (없으면 None)
    random_states = None

    def __getstate__(self):
        """객체를 피클링(pickle)할 때의 상태를 개선합니다.
        
        피클링 전에 CPU 디바이스로 변환하여 저장하고,
        random_states가 설정된 경우엔 그 상태를 generator가 아닌 dict 형식으로 저장합니다.
        
        반환:
            dict: 객체 상태를 나타내는 파이썬 딕셔너리.
        """
        # 현재 사용 중인 디바이스를 백업
        device_backup = self._device
        # 피클링 전에 CPU 디바이스로 변경
        self.set_device(torch.device('cpu'))
        # 객체의 상태를 딕셔너리로 복사
        state = self.__dict__.copy()
        # 원래 디바이스로 복구
        self.set_device(device_backup)
        # 만약 random_states가 (numpy.random.RandomState, torch.Generator) 튜플이면,
        # 각각의 상태를 딕셔너리로 저장하고 기존의 random_states 키는 제거
        if (
            isinstance(self.random_states, tuple) and
            isinstance(self.random_states[0], np.random.RandomState) and
            isinstance(self.random_states[1], torch.Generator)
        ):
            state['_numpy_random_state'] = self.random_states[0].get_state()
            state['_torch_random_state'] = self.random_states[1].get_state()
            state.pop('random_states')
        return state

    def __setstate__(self, state):
        """피클링된 객체의 상태를 복원합니다.
        
        상태 딕셔너리에 저장된 random_states가 있으면 복원하고,
        현재 하드웨어에 맞게 디바이스를 설정합니다.
        """
        # 만약 피클된 상태에 랜덤 상태가 저장되어 있으면 복원
        if '_numpy_random_state' in state and '_torch_random_state' in state:
            np_state = state.pop('_numpy_random_state')
            torch_state = state.pop('_torch_random_state')
            
            # 새 torch.Generator를 생성하고, 저장된 상태를 설정
            current_torch_state = torch.Generator()
            current_torch_state.set_state(torch_state)
            
            # 새 numpy.random.RandomState를 생성하고, 저장된 상태를 설정
            current_numpy_state = np.random.RandomState()
            current_numpy_state.set_state(np_state)
            state['random_states'] = (current_numpy_state, current_torch_state)
        
        # 복원된 상태를 객체에 할당
        self.__dict__ = state
        # 사용 가능한 디바이스(GPU/CPU)에 따라 디바이스 설정
        device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.set_device(device)

    def save(self, path):
        """모델을 주어진 경로에 저장합니다.
        
        저장 전에 CPU 디바이스로 전환하여, 다양한 환경에서 로드할 수 있도록 합니다.
        
        인자:
            path (str): 저장할 파일 경로.
        """
        device_backup = self._device  # 현재 디바이스 백업
        self.set_device(torch.device('cpu'))  # CPU 모드로 전환
        torch.save(self, path)  # 모델 저장
        self.set_device(device_backup)  # 원래 디바이스로 복구

    @classmethod
    def load(cls, path):
        """주어진 경로에서 저장된 모델을 불러옵니다.
        
        인자:
            path (str): 저장된 모델 파일 경로.
        
        반환:
            모델 객체: 불러온 모델.
        """
        # 사용 가능한 디바이스에 따라 설정
        device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        model = torch.load(path)  # 모델 불러오기
        model.set_device(device)  # 디바이스 재설정
        return model

    def set_random_state(self, random_state):
        """랜덤 상태를 설정합니다.
        
        인자:
            random_state (int, tuple, 또는 None):
                - int: numpy와 torch에 동일한 시드를 부여하여 생성.
                - tuple: (numpy.random.RandomState, torch.Generator) 형태.
                - None: 랜덤 상태를 설정하지 않음.
        """
        if random_state is None:
            self.random_states = random_state
        elif isinstance(random_state, int):
            # 정수 시드를 사용하여 numpy와 torch의 난수 생성기를 초기화
            self.random_states = (
                np.random.RandomState(seed=random_state),
                torch.Generator().manual_seed(random_state),
            )
        elif (
            isinstance(random_state, tuple) and
            isinstance(random_state[0], np.random.RandomState) and
            isinstance(random_state[1], torch.Generator)
        ):
            self.random_states = random_state  # 튜플 그대로 설정
        else:
            raise TypeError(
                f'`random_state` {random_state} expected to be an int or a tuple of '
                '(`np.random.RandomState`, `torch.Generator`)')


# DataSampler 함수 (이산형 변수 랜덤하게 선택한 후 로그확률로 카테고리 선택)

In [None]:
"""DataSampler module."""

import numpy as np

class DataSampler(object):
    """
    DataSampler는 CTGAN에서 조건부 벡터(conditional vector)와 이에 대응하는 데이터를 샘플링하는 역할을 합니다.
    이 모듈은 주어진 학습 데이터와 출력 정보(output_info)를 바탕으로,
    각 이산형(범주형) 컬럼별로 데이터 행 인덱스와 카테고리 확률을 미리 계산하여 저장하고,
    학습 또는 샘플링 시 조건부 벡터를 생성할 수 있도록 돕습니다.
    """

    def __init__(self, data, output_info, log_frequency):
        """
        DataSampler 초기화.
        
        인자:
            data (np.array): 학습 데이터 (2차원 넘파이 배열).
            output_info (list): 각 컬럼(열)의 변환 정보를 담은 리스트.
            log_frequency (bool): 이산형 변수 샘플링 시 로그 확률을 사용할지 여부.
        """
        self._data_length = len(data)  # 학습 데이터의 전체 행 개수 저장

        # 주어진 컬럼 정보가 이산형(범주형)인지 판별하는 함수
        def is_discrete_column(column_info):
            # 이산형 컬럼은 column_info가 하나의 원소를 가지며, 해당 원소의 활성화 함수가 'softmax'인 경우로 정의
            return (len(column_info) == 1 and column_info[0].activation_fn == 'softmax')

        # output_info에서 이산형 컬럼의 개수를 계산
        n_discrete_columns = sum(
            [1 for column_info in output_info if is_discrete_column(column_info)]
        )

        # 각 이산형 컬럼에 대한 조건 벡터의 시작 인덱스를 저장할 배열 초기화
        self._discrete_column_matrix_st = np.zeros(n_discrete_columns, dtype='int32')

        # _rid_by_cat_cols: 각 이산형 컬럼의 각 카테고리에 해당하는 행(row) 인덱스를 저장하는 리스트
        # 예: _rid_by_cat_cols[a][b]는 a번째 이산형 컬럼에서 값이 b인 모든 행의 인덱스 리스트
        self._rid_by_cat_cols = []

        # output_info를 기반으로 각 컬럼별로 행 인덱스를 저장
        st = 0  # 시작 인덱스
        for column_info in output_info:
            if is_discrete_column(column_info):
                span_info = column_info[0]
                ed = st + span_info.dim  # 이 컬럼의 끝 인덱스

                rid_by_cat = []
                # 각 카테고리에 대해 해당 열에서 값이 1인 행들의 인덱스 찾기
                for j in range(span_info.dim):
                    rid_by_cat.append(np.nonzero(data[:, st + j])[0])
                self._rid_by_cat_cols.append(rid_by_cat)
                st = ed  # 다음 컬럼을 위해 인덱스 업데이트
            else:
                # 연속형 변수인 경우, 해당 컬럼의 전체 차원만큼 인덱스 증가
                st += sum([span_info.dim for span_info in column_info])
        # 최종적으로 계산된 인덱스가 데이터의 열 수와 일치해야 함을 확인
        assert st == data.shape[1]

        # 조건 벡터를 샘플링할 때 사용할 카테고리 확률 정보를 저장할 배열들을 초기화
        # 최대 카테고리 수를 구함 (여러 이산형 컬럼 중 가장 큰 카테고리 수)
        max_category = max([
            column_info[0].dim
            for column_info in output_info
            if is_discrete_column(column_info)
        ], default=0)

        # 각 이산형 컬럼의 조건 벡터 시작 인덱스를 저장
        self._discrete_column_cond_st = np.zeros(n_discrete_columns, dtype='int32')
        # 각 이산형 컬럼의 카테고리 수를 저장
        self._discrete_column_n_category = np.zeros(n_discrete_columns, dtype='int32')
        # 각 이산형 컬럼의 카테고리별 확률을 저장하는 배열 (n_discrete_columns x max_category)
        self._discrete_column_category_prob = np.zeros((n_discrete_columns, max_category))
        self._n_discrete_columns = n_discrete_columns  # 전체 이산형 컬럼 수
        # 전체 카테고리 수 (모든 이산형 컬럼의 카테고리 수의 합)
        self._n_categories = sum([
            column_info[0].dim
            for column_info in output_info
            if is_discrete_column(column_info)
        ])

        # 각 이산형 컬럼에 대해 카테고리 발생 빈도 및 확률 계산
        st = 0
        current_id = 0  # 이산형 컬럼 인덱스 카운터
        current_cond_st = 0  # 조건 벡터 상에서의 시작 인덱스
        for column_info in output_info:
            if is_discrete_column(column_info):
                span_info = column_info[0]
                ed = st + span_info.dim
                # 해당 컬럼의 각 카테고리 발생 빈도 계산 (열 단위 합산)
                category_freq = np.sum(data[:, st:ed], axis=0)
                if log_frequency:
                    # log_frequency가 True이면, 로그 변환을 적용하여 확률 계산에 사용
                    category_freq = np.log(category_freq + 1)
                # 카테고리별 확률로 정규화
                category_prob = category_freq / np.sum(category_freq)
                # 계산된 확률을 저장 (해당 컬럼의 차원만큼)
                self._discrete_column_category_prob[current_id, :span_info.dim] = category_prob
                # 조건 벡터에서 해당 컬럼의 시작 인덱스 저장
                self._discrete_column_cond_st[current_id] = current_cond_st
                # 해당 컬럼의 카테고리 수 저장
                self._discrete_column_n_category[current_id] = span_info.dim
                # 다음 조건 벡터 시작 인덱스 업데이트
                current_cond_st += span_info.dim
                current_id += 1
                st = ed
            else:
                st += sum([span_info.dim for span_info in column_info])

    def _random_choice_prob_index(self, discrete_column_id):
        """
        주어진 이산형 컬럼에 대해 미리 계산된 카테고리 확률에 따라 확률적 샘플링을 수행합니다.
        
        인자:
            discrete_column_id (int): 샘플링할 이산형 컬럼의 인덱스.
        
        반환:
            각 샘플에 대해 선택된 카테고리 인덱스 배열.
        """
        probs = self._discrete_column_category_prob[discrete_column_id]
        # 각 카테고리에 대해 누적 확률을 구하기 위한 난수 생성 (각 카테고리마다 난수 생성 후 차원 확장)
        r = np.expand_dims(np.random.rand(probs.shape[0]), axis=1)
        # 누적 확률이 난수보다 커지는 첫 번째 인덱스를 선택
        return (probs.cumsum(axis=1) > r).argmax(axis=1)

    def sample_condvec(self, batch):
        """
        학습을 위한 조건 벡터(conditional vector)를 생성합니다.
        
        반환:
            cond (배치 x 전체 카테고리 수): 조건 벡터. 각 샘플마다 한 카테고리에 해당하는 위치에 1이 있음.
            mask (배치 x 이산형 컬럼 수): 각 샘플이 어떤 이산형 컬럼을 선택했는지를 나타내는 one-hot 벡터.
            discrete_column_id (배치): 각 샘플에 대해 선택된 이산형 컬럼의 ID.
            category_id_in_col (배치): 선택된 이산형 컬럼 내에서의 카테고리 인덱스.
        """
        if self._n_discrete_columns == 0:
            return None

        # 배치 내 각 샘플마다 무작위로 하나의 이산형 컬럼 선택
        discrete_column_id = np.random.choice(
            np.arange(self._n_discrete_columns), batch
        )

        # 조건 벡터와 마스크 초기화
        cond = np.zeros((batch, self._n_categories), dtype='float32')
        mask = np.zeros((batch, self._n_discrete_columns), dtype='float32')
        # 선택된 이산형 컬럼을 one-hot 방식으로 표시
        mask[np.arange(batch), discrete_column_id] = 1
        # 각 선택된 이산형 컬럼에 대해 카테고리 인덱스를 확률적으로 샘플링
        category_id_in_col = self._random_choice_prob_index(discrete_column_id)
        # 실제 전체 조건 벡터 상의 인덱스 계산:
        # 각 이산형 컬럼의 조건 벡터 시작 인덱스에 선택된 카테고리 인덱스를 더함
        category_id = (self._discrete_column_cond_st[discrete_column_id] + category_id_in_col)
        # 조건 벡터에서 해당 인덱스 위치에 1 할당
        cond[np.arange(batch), category_id] = 1

        return cond, mask, discrete_column_id, category_id_in_col

    def sample_original_condvec(self, batch):
        """
        원본 데이터 분포를 기반으로 조건 벡터를 샘플링합니다.
        이 방식은 전체 데이터의 카테고리 분포를 반영합니다.
        
        인자:
            batch (int): 배치 당 샘플 수.
        
        반환:
            cond (배치 x 전체 카테고리 수): 샘플링된 조건 벡터.
        """
        if self._n_discrete_columns == 0:
            return None

        # 전체 이산형 컬럼의 카테고리 확률을 1차원 배열로 변환 후, 0이 아닌 값만 선택
        category_freq = self._discrete_column_category_prob.flatten()
        category_freq = category_freq[category_freq != 0]
        # 정규화: 확률의 합이 1이 되도록 조정
        category_freq = category_freq / np.sum(category_freq)
        # 확률 분포를 기반으로 배치마다 카테고리 인덱스 샘플링
        col_idxs = np.random.choice(np.arange(len(category_freq)), batch, p=category_freq)
        # 조건 벡터 초기화
        cond = np.zeros((batch, self._n_categories), dtype='float32')
        # 각 샘플에 대해 선택된 인덱스 위치에 1 할당
        cond[np.arange(batch), col_idxs] = 1

        return cond

    def sample_data(self, data, n, col, opt):
        """
        주어진 조건 벡터를 만족하는 원본 데이터를 샘플링합니다.
        
        인자:
            data (np.array): 원본 학습 데이터.
            n (int): 샘플링할 행의 개수.
            col (list): 선택된 이산형 컬럼 정보 (컬럼 인덱스 리스트).
            opt (list): 각 컬럼에서 선택된 카테고리 인덱스 리스트.
        
        반환:
            np.array: 조건을 만족하는 n개의 데이터 행.
        """
        # 만약 조건(col)이 없다면, 무작위 인덱스로 데이터 샘플링
        if col is None:
            idx = np.random.randint(len(data), size=n)
            return data[idx]

        idx = []
        # 각 이산형 컬럼과 선택된 카테고리에 대해, 해당하는 행 인덱스 중 무작위 선택
        for c, o in zip(col, opt):
            idx.append(np.random.choice(self._rid_by_cat_cols[c][o]))
        return data[idx]

    def dim_cond_vec(self):
        """
        조건 벡터의 총 차원(전체 카테고리 수)을 반환합니다.
        이 값은 Generator에 조건 벡터 크기를 맞추기 위해 사용됩니다.
        """
        return self._n_categories

    def generate_cond_from_condition_column_info(self, condition_info, batch):
        """
        주어진 조건 정보를 기반으로 조건 벡터를 생성합니다.
        
        인자:
            condition_info (dict): 특정 컬럼과 값에 대한 정보.
                예: {'discrete_column_id': x, 'value_id': y}
            batch (int): 생성할 조건 벡터의 배치 크기.
        
        반환:
            vec (np.array): 조건 벡터 (배치 x 전체 카테고리 수).
        """
        vec = np.zeros((batch, self._n_categories), dtype='float32')
        # condition_info를 기반으로 이산형 컬럼의 조건 벡터 시작 인덱스에서 value_id를 더해
        # 조건 벡터 내에서 해당 위치를 찾음
        id_ = self._discrete_column_matrix_st[condition_info['discrete_column_id']]
        id_ += condition_info['value_id']
        vec[:, id_] = 1
        return vec


# 연속형 변수에 대해서 GMM으로 [-1.1]로 정규화 한 후 Tanh로 변환
# 이산형 변수에 대해서 Onehotencoder사용해 원-핫 벡터 사용

In [3]:
!pip install rdt



In [None]:
"""DataTransformer module."""

from collections import namedtuple

import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from rdt.transformers import ClusterBasedNormalizer, OneHotEncoder

# 각 컬럼의 변환 결과에서 출력 차원(dim)과 활성화 함수 정보를 저장하는 namedtuple
SpanInfo = namedtuple('SpanInfo', ['dim', 'activation_fn'])
# 각 컬럼에 대한 변환 정보 (컬럼 이름, 컬럼 타입, 변환기 객체, 출력 정보, 최종 출력 차원)를 저장하는 namedtuple
ColumnTransformInfo = namedtuple(
    'ColumnTransformInfo', [
        'column_name', 'column_type', 'transform', 'output_info', 'output_dimensions'
    ]
)

class DataTransformer(object):
    """
    DataTransformer 클래스는 CTGAN에서 사용할 데이터를 전처리하기 위한 모듈입니다.
    
    연속형(continuous) 컬럼은 Bayesian GMM(ClusterBasedNormalizer)을 사용하여 정규화하고,
    [-1, 1] 범위의 스칼라 값과 one-hot 벡터를 생성합니다.
    이산형(discrete) 컬럼은 OneHotEncoder를 사용하여 원-핫 인코딩 방식으로 변환합니다.
    """

    def __init__(self, max_clusters=10, weight_threshold=0.005):
        """
        DataTransformer 객체를 초기화합니다.
        
        Args:
            max_clusters (int):
                Bayesian GMM에서 사용할 최대 Gaussian 분포 개수.
            weight_threshold (float):
                Gaussian 분포를 유지하기 위한 최소 가중치 임계값.
        """
        self._max_clusters = max_clusters
        self._weight_threshold = weight_threshold

    def _fit_continuous(self, data):
        """
        연속형 컬럼에 대해 Bayesian GMM 기반 정규화 모델을 학습합니다.
        
        Args:
            data (pd.DataFrame):
                변환할 하나의 연속형 컬럼을 포함한 DataFrame.
                
        Returns:
            ColumnTransformInfo:
                학습된 변환 정보를 담은 namedtuple.
                출력 정보는 첫 번째 값은 정규화된 스칼라(tanh 활성화)이고,
                두 번째 값은 해당 컬럼이 어느 Gaussian 컴포넌트에 속하는지를 나타내는 one-hot 벡터(softmax 활성화)입니다.
        """
        column_name = data.columns[0]
        gm = ClusterBasedNormalizer(
            missing_value_generation='from_column',
            max_clusters=min(len(data), self._max_clusters),
            weight_threshold=self._weight_threshold
        )
        gm.fit(data, column_name)
        # 유효한 Gaussian 컴포넌트의 수 계산
        num_components = sum(gm.valid_component_indicator)

        return ColumnTransformInfo(
            column_name=column_name,
            column_type='continuous',
            transform=gm,
            output_info=[SpanInfo(1, 'tanh'), SpanInfo(num_components, 'softmax')],
            output_dimensions=1 + num_components
        )

    def _fit_discrete(self, data):
        """
        이산형 컬럼에 대해 OneHotEncoder를 학습시킵니다.
        
        Args:
            data (pd.DataFrame):
                변환할 하나의 이산형 컬럼을 포함한 DataFrame.
        
        Returns:
            ColumnTransformInfo:
                학습된 OneHotEncoder와 변환 정보를 담은 namedtuple.
                출력 정보는 해당 컬럼의 카테고리 수와 softmax 활성화 함수를 사용합니다.
        """
        column_name = data.columns[0]
        ohe = OneHotEncoder()
        ohe.fit(data, column_name)
        num_categories = len(ohe.dummies)

        return ColumnTransformInfo(
            column_name=column_name,
            column_type='discrete',
            transform=ohe,
            output_info=[SpanInfo(num_categories, 'softmax')],
            output_dimensions=num_categories
        )

    def fit(self, raw_data, discrete_columns=()):
        """
        DataTransformer를 학습시켜, 연속형 컬럼과 이산형 컬럼의 변환기를 구성합니다.
        
        연속형 컬럼은 Bayesian GMM을, 이산형 컬럼은 OneHotEncoder를 사용합니다.
        이 과정에서 전체 출력 차원과 각 컬럼의 span 정보를 계산합니다.
        
        Args:
            raw_data (pd.DataFrame 또는 np.array):
                변환할 원본 데이터.
            discrete_columns (list):
                이산형 변수로 처리할 컬럼 목록 (DataFrame인 경우 컬럼 이름, numpy array인 경우 인덱스).
        """
        self.output_info_list = []
        self.output_dimensions = 0
        self.dataframe = True

        # raw_data가 DataFrame이 아닌 경우 DataFrame으로 변환
        if not isinstance(raw_data, pd.DataFrame):
            self.dataframe = False
            # 숫자형 컬럼 이름 문제 해결을 위해 모든 컬럼 이름을 문자열로 변환
            discrete_columns = [str(column) for column in discrete_columns]
            column_names = [str(num) for num in range(raw_data.shape[1])]
            raw_data = pd.DataFrame(raw_data, columns=column_names)

        # 각 컬럼의 원래 데이터 타입 저장
        self._column_raw_dtypes = raw_data.infer_objects().dtypes
        self._column_transform_info_list = []
        # 각 컬럼별로 변환기를 학습 (이산형 컬럼이면 _fit_discrete, 연속형이면 _fit_continuous)
        for column_name in raw_data.columns:
            if column_name in discrete_columns:
                column_transform_info = self._fit_discrete(raw_data[[column_name]])
            else:
                column_transform_info = self._fit_continuous(raw_data[[column_name]])

            self.output_info_list.append(column_transform_info.output_info)
            self.output_dimensions += column_transform_info.output_dimensions
            self._column_transform_info_list.append(column_transform_info)

    def _transform_continuous(self, column_transform_info, data):
        """
        연속형 컬럼 데이터를 변환하여 CTGAN 학습에 적합한 형식으로 만듭니다.
        
        Args:
            column_transform_info (ColumnTransformInfo):
                해당 컬럼의 변환 정보.
            data (pd.DataFrame):
                변환할 데이터를 포함한 DataFrame.
        
        Returns:
            np.array: 변환된 데이터를 담은 넘파이 배열.
        """
        column_name = data.columns[0]
        # 컬럼을 1차원 배열로 평탄화
        flattened_column = data[column_name].to_numpy().flatten()
        data = data.assign(**{column_name: flattened_column})
        gm = column_transform_info.transform
        transformed = gm.transform(data)

        # 출력 배열 초기화: 첫 번째 열은 정규화된 값, 나머지는 Gaussian 컴포넌트 정보를 one-hot 인코딩한 값
        output = np.zeros((len(transformed), column_transform_info.output_dimensions))
        output[:, 0] = transformed[f'{column_name}.normalized'].to_numpy()
        index = transformed[f'{column_name}.component'].to_numpy().astype(int)
        output[np.arange(index.size), index + 1] = 1.0

        return output

    def _transform_discrete(self, column_transform_info, data):
        """
        이산형 데이터를 One-Hot Encoding 방식으로 변환합니다.
        
        Args:
            column_transform_info (ColumnTransformInfo):
                해당 컬럼의 변환 정보.
            data (pd.DataFrame):
                변환할 데이터를 포함한 DataFrame.
        
        Returns:
            np.array: 원-핫 인코딩된 넘파이 배열.
        """
        ohe = column_transform_info.transform
        return ohe.transform(data).to_numpy()

    def _synchronous_transform(self, raw_data, column_transform_info_list):
        """
        Pandas DataFrame의 각 컬럼을 순차적으로 변환합니다.
        
        Args:
            raw_data (pd.DataFrame): 변환할 데이터.
            column_transform_info_list (list): 각 컬럼의 변환 정보 리스트.
        
        Returns:
            list: 각 컬럼별 변환된 넘파이 배열의 리스트.
        """
        column_data_list = []
        for column_transform_info in column_transform_info_list:
            column_name = column_transform_info.column_name
            data = raw_data[[column_name]]
            if column_transform_info.column_type == 'continuous':
                column_data_list.append(self._transform_continuous(column_transform_info, data))
            else:
                column_data_list.append(self._transform_discrete(column_transform_info, data))
        return column_data_list

    def _parallel_transform(self, raw_data, column_transform_info_list):
        """
        Pandas DataFrame의 각 컬럼을 병렬적으로 변환합니다.
        
        Args:
            raw_data (pd.DataFrame): 변환할 데이터.
            column_transform_info_list (list): 각 컬럼의 변환 정보 리스트.
        
        Returns:
            list: 각 컬럼별 변환된 넘파이 배열의 리스트.
        """
        processes = []
        for column_transform_info in column_transform_info_list:
            column_name = column_transform_info.column_name
            data = raw_data[[column_name]]
            process = None
            if column_transform_info.column_type == 'continuous':
                process = delayed(self._transform_continuous)(column_transform_info, data)
            else:
                process = delayed(self._transform_discrete)(column_transform_info, data)
            processes.append(process)
        return Parallel(n_jobs=-1)(processes)

    def transform(self, raw_data):
        """
        원본 데이터를 변환하여 CTGAN 학습에 사용할 행렬 형태의 데이터로 출력합니다.
        
        Args:
            raw_data (pd.DataFrame 또는 np.array): 변환할 원본 데이터.
        
        Returns:
            np.array: 모든 컬럼을 변환한 후 연결한 행렬 데이터.
        """
        if not isinstance(raw_data, pd.DataFrame):
            column_names = [str(num) for num in range(raw_data.shape[1])]
            raw_data = pd.DataFrame(raw_data, columns=column_names)

        # 데이터 크기가 작으면 순차적 변환, 크면 병렬 변환을 사용
        if raw_data.shape[0] < 500:
            column_data_list = self._synchronous_transform(
                raw_data, self._column_transform_info_list
            )
        else:
            column_data_list = self._parallel_transform(
                raw_data, self._column_transform_info_list
            )

        # 모든 컬럼의 변환 결과를 하나의 넘파이 배열로 연결
        return np.concatenate(column_data_list, axis=1).astype(float)

    def _inverse_transform_continuous(self, column_transform_info, column_data, sigmas, st):
        """
        연속형 데이터를 원본 형식으로 복원합니다.
        
        Args:
            column_transform_info (ColumnTransformInfo): 해당 컬럼의 변환 정보.
            column_data (np.array): 변환된 데이터.
            sigmas (list 또는 None): 복원 시 사용될 표준편차 값들.
            st (int): 현재 컬럼에 해당하는 시점(인덱스) 정보.
        
        Returns:
            pd.DataFrame: 복원된 데이터.
        """
        gm = column_transform_info.transform
        # 변환된 데이터를 DataFrame으로 변환 (정규화된 값과 컴포넌트 정보를 포함)
        data = pd.DataFrame(
            column_data[:, :2], columns=list(gm.get_output_sdtypes())
        ).astype(float)
        # 컴포넌트 정보는 원-핫 인코딩 형태이므로, argmax를 통해 선택된 컴포넌트 인덱스 추출
        data[data.columns[1]] = np.argmax(column_data[:, 1:], axis=1)
        if sigmas is not None:
            # 표준편차를 사용해 선택된 정규화된 값에 노이즈 추가 (복원 과정에서 변동성 부여)
            selected_normalized_value = np.random.normal(data.iloc[:, 0], sigmas[st])
            data.iloc[:, 0] = selected_normalized_value

        # 원래 데이터 형식으로 복원
        return gm.reverse_transform(data)

    def _inverse_transform_discrete(self, column_transform_info, column_data):
        """
        이산형 데이터를 원래 범주형 값으로 복원합니다.
        
        Args:
            column_transform_info (ColumnTransformInfo): 해당 컬럼의 변환 정보.
            column_data (np.array): 변환된 데이터.
        
        Returns:
            pd.Series: 복원된 범주형 데이터.
        """
        ohe = column_transform_info.transform
        data = pd.DataFrame(column_data, columns=list(ohe.get_output_sdtypes()))
        return ohe.reverse_transform(data)[column_transform_info.column_name]

    def inverse_transform(self, data, sigmas=None):
        """
        변환된 행렬 데이터를 원본 데이터 형식으로 복원합니다.
        
        입력 데이터 타입(np.array 또는 pd.DataFrame)에 따라 동일한 형식으로 반환합니다.
        
        Args:
            data (np.array): CTGAN 학습에 사용된 행렬 데이터.
            sigmas (list 또는 None): 복원 시 사용될 표준편차 값 (연속형 데이터에 한함).
        
        Returns:
            원본 형식 (np.array 또는 pd.DataFrame)의 복원된 데이터.
        """
        st = 0
        recovered_column_data_list = []
        column_names = []
        # 각 컬럼별로 저장된 변환 정보를 사용하여 데이터 복원
        for column_transform_info in self._column_transform_info_list:
            dim = column_transform_info.output_dimensions
            column_data = data[:, st:st + dim]
            if column_transform_info.column_type == 'continuous':
                recovered_column_data = self._inverse_transform_continuous(
                    column_transform_info, column_data, sigmas, st
                )
            else:
                recovered_column_data = self._inverse_transform_discrete(
                    column_transform_info, column_data
                )
            recovered_column_data_list.append(recovered_column_data)
            column_names.append(column_transform_info.column_name)
            st += dim

        # 복원된 각 컬럼 데이터를 하나의 배열로 결합 후 DataFrame으로 변환
        recovered_data = np.column_stack(recovered_column_data_list)
        recovered_data = (pd.DataFrame(recovered_data, columns=column_names)
                          .astype(self._column_raw_dtypes))
        # 원본 데이터가 DataFrame이 아니었다면, 넘파이 배열로 반환
        if not self.dataframe:
            recovered_data = recovered_data.to_numpy()

        return recovered_data

    def convert_column_name_value_to_id(self, column_name, value):
        """
        주어진 컬럼 이름과 값에 해당하는 ID를 반환합니다.
        
        Args:
            column_name (str): 조건을 지정할 컬럼 이름.
            value (str 또는 해당 타입): 해당 컬럼에서 찾고자 하는 값.
        
        Returns:
            dict:
                {'discrete_column_id': int, 'column_id': int, 'value_id': int}
                - discrete_column_id: 이산형 컬럼 내에서의 순서 번호.
                - column_id: 전체 컬럼 리스트 내에서의 인덱스.
                - value_id: One-Hot 인코딩 결과에서 가장 큰 값을 가지는 인덱스.
        
        예외:
            주어진 컬럼 이름 또는 값이 존재하지 않으면 ValueError 발생.
        """
        discrete_counter = 0
        column_id = 0
        # _column_transform_info_list를 순회하며 해당 컬럼을 찾음
        for column_transform_info in self._column_transform_info_list:
            if column_transform_info.column_name == column_name:
                break
            if column_transform_info.column_type == 'discrete':
                discrete_counter += 1
            column_id += 1
        else:
            raise ValueError(f"The column_name `{column_name}` doesn't exist in the data.")

        # 해당 컬럼의 OneHotEncoder를 사용하여 주어진 값이 원-핫 인코딩된 결과를 얻음
        ohe = column_transform_info.transform
        data = pd.DataFrame([value], columns=[column_transform_info.column_name])
        one_hot = ohe.transform(data).to_numpy()[0]
        if sum(one_hot) == 0:
            raise ValueError(f"The value `{value}` doesn't exist in the column `{column_name}`.")

        return {
            'discrete_column_id': discrete_counter,
            'column_id': column_id,
            'value_id': np.argmax(one_hot)
        }


# G, D, Q 네트워크 생성

In [None]:
import torch
from torch import nn
from torch.nn import Module, Linear, LeakyReLU, Dropout, Sequential

class Discriminator(Module):
    """
    CTGAN용 Discriminator 클래스.
    이 모델은 InfoGAN의 개념을 도입하여, D_head와 Q_head를 포함합니다.
    - D_head: 진짜와 가짜를 판별하는 역할.
    - Q_head: 잠재 코드(latent code)를 예측하여 정보 최대화(Mutual Information)를 유도합니다.
    """

    def __init__(self, input_dim, discriminator_dim, latent_dim, pac=10):
        """
        Discriminator 초기화.
        
        Args:
            input_dim (int): 원본 데이터의 차원.
            discriminator_dim (list): 은닉층의 노드 수 리스트.
            latent_dim (int): Q_head에서 예측할 잠재 코드 차원.
            pac (int): PacGAN에서 사용하는 패치 크기 (한 묶음에 포함할 샘플 수).
        """
        super(Discriminator, self).__init__()
        # PacGAN 기법을 적용하기 위해 입력 차원을 pac 배로 확장
        dim = input_dim * pac
        self.pac = pac
        self.pacdim = dim

        # D_head와 Q_head가 공유하는 신경망 계층 구성
        seq = []
        for item in list(discriminator_dim):
            # 각 Linear 계층 뒤에 LeakyReLU와 Dropout을 적용
            seq += [Linear(dim, item), LeakyReLU(0.2), Dropout(0.5)]
            dim = item  # 출력 차원을 다음 계층의 입력 차원으로 사용

        self.shared = Sequential(*seq)

        # D_head: 진짜/가짜 판별을 위한 선형 계층 (출력 차원 1)
        self.D_head = Linear(dim, 1)

        # Q_head: 잠재 코드 예측을 위한 계층들
        # Q_mu: 잠재 코드의 평균 예측
        self.Q_mu = Linear(dim, latent_dim)
        # Q_var: 잠재 코드의 분산 예측 (분산은 양수여야 하므로, forward에서 exp 함수를 적용)
        self.Q_var = Linear(dim, latent_dim)

    def calc_gradient_penalty(self, real_data, fake_data, device='cpu', pac=10, lambda_=10):
        """
        WGAN-GP를 위한 Gradient Penalty를 계산합니다.
        
        Args:
            real_data (torch.Tensor): 실제 데이터.
            fake_data (torch.Tensor): 생성된 가짜 데이터.
            device (str): 사용할 디바이스 (예: 'cpu' 또는 'cuda').
            pac (int): PacGAN에서 사용하는 샘플 묶음 크기.
            lambda_ (float): Gradient Penalty 계수.
        
        Returns:
            torch.Tensor: 계산된 Gradient Penalty 값.
        """
        # 각 pac 묶음에 대해 난수를 생성하여 보간(interpolation)에 사용할 가중치 생성
        alpha = torch.rand(real_data.size(0) // pac, 1, 1, device=device)
        alpha = alpha.repeat(1, pac, real_data.size(1))
        alpha = alpha.view(-1, real_data.size(1))

        # 실제 데이터와 가짜 데이터 사이의 선형 보간(interpolates) 계산
        interpolates = alpha * real_data + ((1 - alpha) * fake_data)

        # 보간된 데이터에 대해 Discriminator를 적용하여 출력 계산
        disc_interpolates = self(interpolates)[0]

        # 보간된 데이터에 대한 gradient 계산
        gradients = torch.autograd.grad(
            outputs=disc_interpolates, 
            inputs=interpolates,
            grad_outputs=torch.ones(disc_interpolates.size(), device=device),
            create_graph=True, 
            retain_graph=True, 
            only_inputs=True
        )[0]

        # gradient의 L2 norm 계산 후 1에서 빼고 제곱, 평균을 내어 Gradient Penalty 계산
        gradients_view = gradients.view(-1, pac * real_data.size(1)).norm(2, dim=1) - 1
        gradient_penalty = ((gradients_view) ** 2).mean() * lambda_

        return gradient_penalty

    def forward(self, input_):
        """
        Discriminator의 순전파 과정.
        입력 데이터를 받아, D_head와 Q_head를 통해 진짜/가짜 판별 결과와 잠재 코드 예측 결과를 반환합니다.
        
        Args:
            input_ (torch.Tensor): 입력 데이터.
        
        Returns:
            tuple: (D_head 출력, Q_head에서 예측한 평균(mu), Q_head에서 예측한 분산(var))
        """
        batch_size = input_.size(0)
        # PacGAN 적용 시, 배치 크기가 pac의 배수인지 확인
        assert batch_size % self.pac == 0

        # pac 단위로 데이터를 묶어 하나의 긴 벡터로 변환
        combined_input = input_.view(batch_size // self.pac, -1)
        # 공유 계층을 통해 특징 추출
        shared_output = self.shared(combined_input)

        # D_head를 통해 진짜/가짜 판별 결과 계산
        disc_output = self.D_head(shared_output)

        # Q_head를 통해 잠재 코드의 평균과 분산 예측
        mu = self.Q_mu(shared_output)
        var = torch.exp(self.Q_var(shared_output))  # 분산은 항상 양수가 되어야 하므로 exp 적용

        # Q_head의 예측 결과를 원래 배치 크기에 맞게 반복
        mu = mu.repeat(self.pac, 1)
        var = var.repeat(self.pac, 1)

        # 최종적으로, D_head의 출력, 평균, 분산을 반환 (배치 사이즈에 맞게 reshape)
        return disc_output, mu.view(batch_size, -1), var.view(batch_size, -1)

# # 테스트 코드
# # 설정 파라미터
# input_dim = 128         # 입력 데이터 차원
# discriminator_dim = [256, 128]  # 은닉층 크기 리스트
# latent_dim = 10         # Q_head에서 예측할 잠재 코드 차원
# pac = 10                # PacGAN에서 사용할 샘플 묶음 크기
# batch_size = 100        # 배치 크기

# # Discriminator 객체 생성
# discriminator = Discriminator(input_dim, discriminator_dim, latent_dim, pac=pac)

# # 테스트용 랜덤 데이터 생성
# real_data = torch.randn(batch_size, input_dim)
# fake_data = torch.randn(batch_size, input_dim)

# # Discriminator를 통해 실제 데이터에 대한 출력 확인
# disc_output, mu, var = discriminator(real_data)
# print(disc_output.shape, mu.shape, var.shape)


torch.Size([10, 1]) torch.Size([100, 10]) torch.Size([100, 10])


In [None]:
import torch
from torch.nn import Module, Linear, BatchNorm1d, ReLU

class Residual(Module):
    """
    CTGAN에서 사용되는 Residual Layer.
    이 레이어는 입력 데이터와 잠재 코드(latent code)를 결합하여 처리한 후,
    원본 입력을 skip connection 방식으로 최종 출력에 다시 추가하여 정보를 보존합니다.
    """

    def __init__(self, i, o, latent_dim):
        """
        Residual 레이어를 초기화합니다.
        
        Args:
            i (int): 입력 데이터의 차원.
            o (int): 이 레이어에서 생성될 출력 차원.
            latent_dim (int): 잠재 코드의 차원.
        """
        super(Residual, self).__init__()
        # 입력 데이터와 잠재 코드를 결합한 후, o 차원으로 선형 변환
        self.fc = Linear(i + latent_dim, o)
        # 선형 변환의 출력을 정규화하여 학습 안정성을 높임
        self.bn = BatchNorm1d(o)
        # ReLU 활성화 함수 적용하여 비선형성을 부여
        self.relu = ReLU()
        self.latent_dim = latent_dim

    def forward(self, input_, latent_code):
        """
        입력 데이터와 잠재 코드를 받아 Residual 연결을 적용합니다.
        
        Args:
            input_ (torch.Tensor): 입력 데이터 텐서 (batch_size, input_dim).
            latent_code (torch.Tensor): 잠재 코드 텐서 (batch_size, latent_dim).
        
        Returns:
            torch.Tensor: Residual 연결이 적용된 결과 텐서.
        """
        # latent_code를 입력 데이터의 배치 크기에 맞게 확장 (batch_size x latent_dim)
        latent_code_expanded = latent_code.expand(input_.size(0), self.latent_dim)
        # 입력 데이터와 확장된 잠재 코드를 열 방향으로 결합
        combined_input = torch.cat([input_, latent_code_expanded], dim=1)
        # 결합된 입력에 대해 선형 변환 -> 배치 정규화 -> ReLU 활성화 함수 적용
        out = self.fc(combined_input)
        out = self.bn(out)
        out = self.relu(out)
        # 최종 출력은 선형 변환 결과와 원본 입력을 다시 결합하여, skip connection을 적용함
        return torch.cat([out, input_], dim=1)


In [None]:
from torch.nn import Sequential

class Generator(Module):
    """
    CTGAN에서 사용되는 Generator.
    이 생성기는 입력된 노이즈 벡터(embedding)와 잠재 코드(latent code)를 결합하여
    최종적으로 CTGAN이 생성할 데이터(data_dim 차원)를 출력합니다.
    
    Generator는 Residual Block을 반복적으로 쌓아 깊은 네트워크를 구성하며,
    각 Residual Block은 입력 데이터와 잠재 코드 정보를 결합하여 처리한 후,
    입력 정보를 skip connection 방식으로 추가합니다.
    """
    
    def __init__(self, embedding_dim, generator_dim, data_dim, latent_dim):
        """
        Generator 초기화.
        
        Args:
            embedding_dim (int): 입력 노이즈 벡터의 차원.
            generator_dim (list or tuple): 각 Residual Block에서 사용할 은닉층의 출력 차원 리스트.
            data_dim (int): 최종 생성할 데이터의 차원.
            latent_dim (int): 잠재 코드의 차원.
        """
        super(Generator, self).__init__()
        # 초기 입력 차원은 embedding_dim (노이즈 벡터 차원)
        dim = embedding_dim
        self.latent_dim = latent_dim
        seq = []  # Generator의 각 레이어를 순차적으로 저장할 리스트
        
        # generator_dim에 지정된 각 값에 대해 Residual Block을 추가
        for item in list(generator_dim):
            # Residual Block: 입력 차원(dim)과 출력 차원(item), 그리고 latent_dim을 사용
            seq += [Residual(dim, item, latent_dim)]
            # Residual 연결로 인해 출력 차원이 증가: 기존 입력에 item 만큼의 출력이 더해짐
            dim += item
        
        # 마지막 Fully Connected Layer를 추가하여 최종 데이터 차원(data_dim)으로 변환
        seq.append(Linear(dim, data_dim))
        
        # Sequential 컨테이너에 레이어들을 담아 순차적으로 적용할 수 있도록 함
        self.seq = Sequential(*seq)

    def forward(self, input_, latent_code):
        """
        Generator의 순전파 과정.
        
        Args:
            input_ (torch.Tensor): 입력 노이즈 벡터 (batch_size, embedding_dim).
            latent_code (torch.Tensor): 잠재 코드 (batch_size, latent_dim).
        
        Returns:
            torch.Tensor: 생성된 데이터 (batch_size, data_dim).
        """
        # 초기 입력으로 노이즈 벡터를 사용
        data = input_
        # Sequential 컨테이너에 저장된 모든 레이어를 순차적으로 적용
        for layer in self.seq:
            # Residual 레이어는 latent_code도 함께 입력해야 함
            if isinstance(layer, Residual):
                data = layer(data, latent_code)
            else:
                # 마지막 Fully Connected Layer 등 latent_code가 필요 없는 경우
                data = layer(data)
        return data


In [None]:
def sample(self, n, condition_column=None, condition_value=None):
    """
    훈련 데이터와 유사한 샘플 데이터를 생성합니다.
    
    특정 condition_column과 condition_value를 지정하면 해당 조건을 
    반영하여 샘플링할 확률이 높아집니다.
    
    Args:
        n (int): 생성할 행(row)의 개수.
        condition_column (str): 조건을 적용할 이산형 컬럼 이름.
        condition_value (str): condition_column에서 우선 발생시킬 범주 값.
    
    Returns:
        numpy.ndarray 또는 pandas.DataFrame: 원본 데이터와 유사한 생성된 샘플 데이터.
    """
    # 조건 컬럼과 값이 지정된 경우, 해당 조건에 맞는 조건 벡터를 생성
    if condition_column is not None and condition_value is not None:
        # DataTransformer를 통해 컬럼 이름과 값에 해당하는 ID 정보를 얻음
        condition_info = self._transformer.convert_column_name_value_to_id(
            condition_column, condition_value)
        # DataSampler를 사용해 해당 조건을 반영한 조건 벡터 생성
        global_condition_vec = self._data_sampler.generate_cond_from_condition_column_info(
            condition_info, self._batch_size)
    else:
        # 조건이 없으면 global_condition_vec는 None으로 설정
        global_condition_vec = None

    # 전체 n개의 샘플을 생성하기 위해 필요한 배치 반복 횟수 계산
    steps = n // self._batch_size + 1
    data = []  # 생성된 데이터 저장 리스트

    for i in range(steps):
        # 1. 노이즈 벡터 생성: 평균 0, 표준편차 1인 정규분포를 따르는 벡터 생성
        mean = torch.zeros(self._batch_size, self._embedding_dim)
        std = mean + 1
        fakez = torch.normal(mean=mean, std=std).to(self._device)

        # 2. 조건 벡터 적용:
        #    조건 벡터가 지정되어 있으면 그 벡터를 복사,
        #    그렇지 않으면 원본 데이터 분포를 반영한 조건 벡터 생성
        if global_condition_vec is not None:
            condvec = global_condition_vec.copy()
        else:
            condvec = self._data_sampler.sample_original_condvec(self._batch_size)

        # 3. 조건 벡터가 존재할 경우, NumPy 배열을 PyTorch Tensor로 변환 후 노이즈 벡터와 결합
        if condvec is not None:
            c1 = torch.from_numpy(condvec).to(self._device)
            fakez = torch.cat([fakez, c1], dim=1)

        # 4. Generator를 이용해 가짜 데이터 생성
        fake = self._generator(fakez)
        # 5. Generator 출력에 대해 활성화 함수를 적용하여 적절한 값으로 변환
        fakeact = self._apply_activate(fake)
        # 6. 생성된 데이터를 CPU로 옮기고 NumPy 배열로 변환하여 저장
        data.append(fakeact.detach().cpu().numpy())

    # 모든 배치의 데이터를 하나의 배열로 결합하고, n개의 샘플만 선택
    data = np.concatenate(data, axis=0)
    data = data[:n]

    # DataTransformer의 inverse_transform을 통해 생성된 데이터를 원본 데이터 형식으로 복원하여 반환
    return self._transformer.inverse_transform(data)


In [None]:
import numpy as np
import torch

class MutualInformationLoss(torch.nn.Module):
    """
    상호정보 손실(Mutual Information Loss) 모듈.
    InfoGAN 구조에서 Q 네트워크가 잠재 변수(latent code)를 복원하도록 유도하기 위해,
    실제 잠재 변수(x)와 Q 네트워크가 예측한 평균(mu) 및 분산(var) 사이의 음의 로그 가능도(NLL)를 계산합니다.
    """
    def __init__(self):
        super(MutualInformationLoss, self).__init__()
    
    def forward(self, x, mu, var):
        """
        실제 잠재 변수(x)와 Q 네트워크의 예측(mu, var) 간의 음의 로그 가능도(NLL)를 계산합니다.
        
        Args:
            x (torch.Tensor): 실제 잠재 변수, shape: (batch_size, latent_dim)
            mu (torch.Tensor): Q 네트워크가 예측한 평균, shape: (batch_size, latent_dim)
            var (torch.Tensor): Q 네트워크가 예측한 분산, shape: (batch_size, latent_dim)
        
        Returns:
            torch.Tensor: 계산된 상호정보 손실 값 (스칼라)
        """
        # 정규 분포의 로그 가능도(log-likelihood) 계산:
        # 첫 번째 항: 분산에 대한 로그 항 (-0.5 * log(2π * var))
        # 두 번째 항: (x - mu)^2 / (2 * var)
        logli = -0.5 * (var.mul(2 * np.pi) + 1e-6).log() - (x - mu).pow(2).div(var.mul(2.0) + 1e-6)
        
        # 각 샘플에 대해 로그 가능도 합산 후 평균을 취하고, 부호를 반전하여 음의 로그 가능도(NLL)를 계산
        nll = -(logli.sum(1).mean())
        
        return nll


In [None]:
# CTGAN
from torch import nn, optim
from tqdm import tqdm
from torch.nn import functional
import warnings

import numpy as np
import pandas as pd
import torch
from torch import optim
from torch.nn import BatchNorm1d, Dropout, LeakyReLU, Linear, Module, ReLU, Sequential, functional
from tqdm import tqdm


class CTGAN(BaseSynthesizer):
    """
    Conditional Table GAN Synthesizer 클래스.
    
    CTGAN은 테이블 데이터를 조건부로 생성하기 위한 GAN 모델입니다.
    자세한 내용은 논문 "Modeling Tabular data using Conditional GAN"을 참고하세요.
    
    인자:
        embedding_dim (int): Generator에 입력될 랜덤 샘플(노이즈 벡터)의 차원. (기본값: 128)
        generator_dim (tuple or list of ints): Generator의 각 Residual Layer의 출력 차원. (기본값: (256, 256))
        discriminator_dim (tuple or list of ints): Discriminator의 각 Linear Layer의 출력 차원. (기본값: (256, 256))
        latent_dim (int): 추가로 사용할 잠재 코드의 차원. (기본값: 1)
        generator_lr (float): Generator 학습률. (기본값: 2e-4)
        generator_decay (float): Generator 가중치 감쇠. (기본값: 1e-6)
        discriminator_lr (float): Discriminator 학습률. (기본값: 2e-4)
        discriminator_decay (float): Discriminator 가중치 감쇠. (기본값: 1e-6)
        batch_size (int): 각 학습 단계에서 처리할 샘플 수. (기본값: 500)
        discriminator_steps (int): Generator 업데이트 당 Discriminator 업데이트 횟수. (기본값: 1)
        log_frequency (bool): 이산형 변수 샘플링 시 로그 빈도 사용 여부. (기본값: True)
        verbose (bool): 학습 진행 상황을 출력할지 여부. (기본값: False)
        epochs (int): 총 학습 에폭 수. (기본값: 300)
        pac (int): PacGAN에서 한 묶음으로 처리할 샘플 수. (기본값: 10)
        cuda (bool): GPU 사용 여부. (기본값: True; GPU 사용 가능하지 않으면 CPU 사용)
    """

    def __init__(self, embedding_dim=128, generator_dim=(256, 256), discriminator_dim=(256, 256),
                 latent_dim=1, generator_lr=2e-4, generator_decay=1e-6, discriminator_lr=2e-4,
                 discriminator_decay=1e-6, batch_size=500, discriminator_steps=1,
                 log_frequency=True, verbose=False, epochs=300, pac=10, cuda=True):

        # 배치 크기는 짝수여야 함을 확인
        assert batch_size % 2 == 0

        # 기본 하이퍼파라미터 설정
        self._embedding_dim = embedding_dim
        self._generator_dim = generator_dim
        self._discriminator_dim = discriminator_dim
        self._latent_dim = latent_dim  # 추가되는 잠재 코드 차원

        self._generator_lr = generator_lr
        self._generator_decay = generator_decay
        self._discriminator_lr = discriminator_lr
        self._discriminator_decay = discriminator_decay

        self._batch_size = batch_size
        self._discriminator_steps = discriminator_steps
        self._log_frequency = log_frequency
        self._verbose = verbose
        self._epochs = epochs
        self.pac = pac

        # 디바이스 설정: cuda 인자와 GPU 사용 가능 여부에 따라 CPU 또는 GPU 선택
        if not cuda or not torch.cuda.is_available():
            device = 'cpu'
        elif isinstance(cuda, str):
            device = cuda
        else:
            device = 'cuda'

        self._device = torch.device(device)

        # 이후 학습 및 샘플링 시 사용할 객체들 초기화
        self._transformer = None            # 데이터 전처리를 위한 DataTransformer
        self._data_sampler = None           # 조건 벡터 및 샘플 데이터를 위한 DataSampler
        self._generator = None              # 생성기 네트워크
        self.loss_values = None             # 에폭별 손실 값 기록용 DataFrame
        self._discriminator = None          # 판별기 네트워크
        self._q_network = None              # Q 네트워크 (잠재 코드 예측용; InfoGAN 아이디어)
        self.mutual_information_loss = MutualInformationLoss()  # 상호정보 손실 모듈

    @staticmethod
    def _gumbel_softmax(logits, tau=1, hard=False, eps=1e-10, dim=-1):
        """
        구블 소프트맥스(gumbel_softmax)의 불안정성을 완화하기 위한 함수.
        
        인자:
            logits: 정규화되지 않은 로그 확률들 (..., num_features)
            tau: 온도 하이퍼파라미터 (양수 스칼라)
            hard (bool): True이면, 반환된 샘플을 one-hot 벡터로 만들지만 미분은 soft sample로 처리.
            eps: 수치 안정성을 위한 작은 값.
            dim: 소프트맥스를 적용할 차원 (기본값: -1)
        
        반환:
            logits와 동일한 shape의 gumbel_softmax 샘플.
        """
        for _ in range(10):
            transformed = functional.gumbel_softmax(logits, tau=tau, hard=hard, eps=eps, dim=dim)
            if not torch.isnan(transformed).any():
                return transformed

        raise ValueError('gumbel_softmax returning NaN.')

    def _apply_activate(self, data):
        """
        Generator의 출력 데이터에 대해 각 컬럼별 활성화 함수(tanh 또는 softmax)를 적용합니다.
        
        DataTransformer에 의해 설정된 output_info_list를 기반으로,
        각 컬럼마다 적절한 활성화 함수를 적용하여 결과를 하나의 텐서로 결합합니다.
        """
        data_t = []
        st = 0
        # 각 컬럼의 변환 정보에 따라 활성화 함수 적용
        for column_info in self._transformer.output_info_list:
            for span_info in column_info:
                if span_info.activation_fn == 'tanh':
                    ed = st + span_info.dim
                    data_t.append(torch.tanh(data[:, st:ed]))
                    st = ed
                elif span_info.activation_fn == 'softmax':
                    ed = st + span_info.dim
                    transformed = self._gumbel_softmax(data[:, st:ed], tau=0.2)
                    data_t.append(transformed)
                    st = ed
                else:
                    raise ValueError(f'Unexpected activation function {span_info.activation_fn}.')
        return torch.cat(data_t, dim=1)

    def _cond_loss(self, data, c, m):
        """
        고정된 이산형 컬럼에 대해 cross entropy 손실을 계산합니다.
        
        인자:
            data: Generator 출력 데이터.
            c: 조건 벡터.
            m: 마스크 벡터 (어떤 컬럼이 선택되었는지 나타냄).
        
        반환:
            계산된 조건부 손실 (cross entropy loss).
        """
        loss = []
        st = 0
        st_c = 0
        # DataTransformer의 output_info_list를 순회하며 이산형 컬럼인 경우 cross entropy 계산
        for column_info in self._transformer.output_info_list:
            for span_info in column_info:
                if len(column_info) != 1 or span_info.activation_fn != 'softmax':
                    # 이산형 컬럼이 아니면 스킵
                    st += span_info.dim
                else:
                    ed = st + span_info.dim
                    ed_c = st_c + span_info.dim
                    tmp = functional.cross_entropy(
                        data[:, st:ed],
                        torch.argmax(c[:, st_c:ed_c], dim=1),
                        reduction='none'
                    )
                    loss.append(tmp)
                    st = ed
                    st_c = ed_c

        loss = torch.stack(loss, dim=1)  # 각 컬럼별 손실을 하나의 텐서로 결합

        return (loss * m).sum() / data.size()[0]

    def _validate_discrete_columns(self, train_data, discrete_columns):
        """
        학습 데이터(train_data)에 지정한 discrete_columns(이산형 컬럼)가 존재하는지 확인합니다.
        
        인자:
            train_data (numpy.ndarray 또는 pandas.DataFrame): 2차원 학습 데이터.
            discrete_columns (list-like): 조건 벡터 생성을 위한 이산형 컬럼 목록.
                - DataFrame인 경우 컬럼 이름 리스트.
                - numpy array인 경우 컬럼 인덱스 리스트.
        
        예외:
            유효하지 않은 컬럼이 발견되면 ValueError 발생.
        """
        if isinstance(train_data, pd.DataFrame):
            invalid_columns = set(discrete_columns) - set(train_data.columns)
        elif isinstance(train_data, np.ndarray):
            invalid_columns = []
            for column in discrete_columns:
                if column < 0 or column >= train_data.shape[1]:
                    invalid_columns.append(column)
        else:
            raise TypeError('`train_data` should be either pd.DataFrame or np.array.')

        if invalid_columns:
            raise ValueError(f'Invalid columns found: {invalid_columns}')

    @random_state
    def fit(self, train_data, discrete_columns=(), epochs=None):
        """
        CTGAN 모델을 학습하기 위해 학습 데이터를 전처리하고, 
        Generator, Discriminator, DataSampler 등 관련 구성 요소들을 초기화합니다.
        
        Args:
            train_data (numpy.ndarray 또는 pandas.DataFrame): 2차원 학습 데이터.
            discrete_columns (list-like): 조건 벡터 생성을 위한 이산형 컬럼 목록.
            epochs: 학습 에폭 수. (생성자에서 지정한 값을 기본으로 사용)
        
        동작:
            1. discrete_columns의 유효성 검사.
            2. DataTransformer를 사용하여 데이터를 변환.
            3. DataSampler 생성.
            4. Generator와 Discriminator 네트워크 생성 및 옵티마이저 설정.
            5. 각 에폭마다 Generator와 Discriminator를 번갈아 업데이트하며 학습.
            6. 에폭별 손실값을 기록.
        """
        # 이산형 컬럼이 학습 데이터에 존재하는지 검증
        self._validate_discrete_columns(train_data, discrete_columns)

        if epochs is None:
            epochs = self._epochs
        else:
            warnings.warn(
                ('`epochs` argument in `fit` method has been deprecated and will be removed '
                 'in a future version. Please pass `epochs` to the constructor instead'),
                DeprecationWarning
            )

        # 데이터 전처리: DataTransformer를 사용하여 원본 데이터를 변환
        self._transformer = DataTransformer()
        self._transformer.fit(train_data, discrete_columns)
        train_data = self._transformer.transform(train_data)

        # DataSampler 생성: 조건 벡터 및 샘플 데이터를 위한 객체
        self._data_sampler = DataSampler(
            train_data,
            self._transformer.output_info_list,
            self._log_frequency)

        data_dim = self._transformer.output_dimensions

        # Generator 생성 (latent code 추가)
        self._generator = Generator(
            self._embedding_dim + self._data_sampler.dim_cond_vec(),
            self._generator_dim,
            data_dim,
            self._latent_dim  # latent_dim 추가
        ).to(self._device)

        # Discriminator 생성 (latent code 추가)
        self._discriminator = Discriminator(
            data_dim + self._data_sampler.dim_cond_vec(),
            self._discriminator_dim,
            latent_dim=self._latent_dim,  # latent_dim 추가
            pac=self.pac
        ).to(self._device)

        # 옵티마이저 설정: Adam 옵티마이저 사용
        optimizerG = optim.Adam(
            self._generator.parameters(), lr=self._generator_lr, betas=(0.5, 0.9),
            weight_decay=self._generator_decay
        )

        optimizerD = optim.Adam(
            self._discriminator.parameters(), lr=self._discriminator_lr,
            betas=(0.5, 0.9), weight_decay=self._discriminator_decay
        )

        # 노이즈 벡터 생성을 위한 평균과 표준편차 텐서 생성
        mean = torch.zeros(self._batch_size, self._embedding_dim, device=self._device)
        std = torch.ones(self._batch_size, self._embedding_dim, device=self._device)

        # 에폭별 손실값을 기록할 DataFrame 초기화
        self.loss_values = pd.DataFrame(columns=['Epoch', 'Generator Loss', 'Discriminator Loss'])

        # 학습 진행률 표시를 위한 tqdm 이터레이터 설정
        epoch_iterator = tqdm(range(epochs), disable=(not self._verbose))
        if self._verbose:
            description = 'Gen. ({gen:.2f}) | Discrim. ({dis:.2f})'
            epoch_iterator.set_description(description.format(gen=0, dis=0))

        # 에폭 당 스텝 수 계산 (전체 데이터를 배치 크기로 나눈 값, 최소 1)
        steps_per_epoch = max(len(train_data) // self._batch_size, 1)
        for i in epoch_iterator:
            for id_ in range(steps_per_epoch):

                # Discriminator 업데이트: 지정된 횟수만큼 반복
                for n in range(self._discriminator_steps):
                    fakez = torch.normal(mean=mean, std=std)
                    # Uniform 분포에서 latent code 생성 (범위: -1 ~ 1)
                    latent_code = torch.rand(self._batch_size, self._latent_dim, device=self._device) * 2 - 1  

                    # 조건 벡터 샘플링
                    condvec = self._data_sampler.sample_condvec(self._batch_size)
                    if condvec is None:
                        c1, m1, col, opt = None, None, None, None
                        real = self._data_sampler.sample_data(
                            train_data, self._batch_size, col, opt)
                    else:
                        c1, m1, col, opt = condvec
                        c1 = torch.from_numpy(c1).to(self._device)
                        m1 = torch.from_numpy(m1).to(self._device)
                        fakez = torch.cat([fakez, c1], dim=1)

                        # 데이터 샘플의 순서를 무작위로 섞음
                        perm = np.arange(self._batch_size)
                        np.random.shuffle(perm)
                        real = self._data_sampler.sample_data(
                            train_data, self._batch_size, col[perm], opt[perm])
                        c2 = c1[perm]
                    # Generator에 노이즈와 latent code (및 조건 벡터) 입력하여 가짜 데이터 생성
                    fake = self._generator(fakez, latent_code)  # latent code 추가
                    fakeact = self._apply_activate(fake)

                    real = torch.from_numpy(real.astype('float32')).to(self._device)

                    if c1 is not None:
                        fake_cat = torch.cat([fakeact, c1], dim=1)
                        real_cat = torch.cat([real, c2], dim=1)
                    else:
                        real_cat = real
                        fake_cat = fakeact

                    # Discriminator를 통한 진짜/가짜 판별 결과 계산
                    y_fake, _, _ = self._discriminator(fake_cat)
                    y_real, _, _ = self._discriminator(real_cat)

                    # Gradient Penalty 계산 (WGAN-GP)
                    pen = self._discriminator.calc_gradient_penalty(
                        real_cat, fake_cat, self._device, self.pac)
                    loss_d = -(torch.mean(y_real) - torch.mean(y_fake))

                    optimizerD.zero_grad(set_to_none=False)
                    pen.backward(retain_graph=True)
                    loss_d.backward()
                    optimizerD.step()

                # Generator 업데이트 단계
                fakez = torch.normal(mean=mean, std=std)
                latent_code = torch.rand(self._batch_size, self._latent_dim, device=self._device) * 2 - 1  # latent code 생성
                condvec = self._data_sampler.sample_condvec(self._batch_size)

                if condvec is None:
                    c1, m1, col, opt = None, None, None, None
                else:
                    c1, m1, col, opt = condvec
                    c1 = torch.from_numpy(c1).to(self._device)
                    m1 = torch.from_numpy(m1).to(self._device)
                    fakez = torch.cat([fakez, c1], dim=1)

                fake = self._generator(fakez, latent_code)  # latent code 추가
                fakeact = self._apply_activate(fake)

                if c1 is not None:
                    y_fake, mu, var = self._discriminator(torch.cat([fakeact, c1], dim=1))
                else:
                    y_fake, mu, var = self._discriminator(fakeact)

                if condvec is None:
                    cross_entropy = 0
                else:
                    cross_entropy = self._cond_loss(fake, c1, m1)

                # Mutual Information Loss 계산: latent code와 Discriminator의 Q_head 예측 간 차이
                mi_loss = self.mutual_information_loss(latent_code, mu, var)  

                # Generator 손실: -Discriminator 출력의 평균 + cross entropy + mutual information loss
                loss_g = -torch.mean(y_fake) + cross_entropy + mi_loss  

                optimizerG.zero_grad(set_to_none=False)
                loss_g.backward()
                optimizerG.step()

            # 에폭별 손실값 기록
            generator_loss = loss_g.detach().cpu().item()
            discriminator_loss = loss_d.detach().cpu().item()

            epoch_loss_df = pd.DataFrame({
                'Epoch': [i],
                'Generator Loss': [generator_loss],
                'Discriminator Loss': [discriminator_loss]
            })
            if not self.loss_values.empty:
                self.loss_values = pd.concat(
                    [self.loss_values, epoch_loss_df]
                ).reset_index(drop=True)
            else:
                self.loss_values = epoch_loss_df

            if self._verbose:
                epoch_iterator.set_description(
                    description.format(gen=generator_loss, dis=discriminator_loss)
                )

    def sample(self, n, condition_column=None, condition_value=None):
        """
        학습된 모델을 사용하여 원본 데이터와 유사한 데이터를 샘플링합니다.
        
        조건으로 condition_column과 condition_value를 지정하면 해당 범주가 반영될 확률이 높아집니다.
        
        Args:
            n (int): 생성할 샘플 행(row)의 개수.
            condition_column (str): 조건을 적용할 이산형 컬럼 이름.
            condition_value (str): condition_column에서 우선 발생시킬 범주 값.
        
        Returns:
            numpy.ndarray 또는 pandas.DataFrame: 생성된 샘플 데이터.
        """
        # 조건이 지정된 경우: 해당 조건을 반영한 조건 벡터 생성
        if condition_column is not None and condition_value is not None:
            condition_info = self._transformer.convert_column_name_value_to_id(
                condition_column, condition_value)
            global_condition_vec = self._data_sampler.generate_cond_from_condition_column_info(
                condition_info, self._batch_size)
        else:
            global_condition_vec = None

        # n개의 샘플 생성을 위해 필요한 배치 수 계산
        steps = n // self._batch_size + 1
        data = []
        for i in range(steps):
            # 노이즈 벡터 생성: 평균 0, 표준편차 1인 정규분포 따름
            mean = torch.zeros(self._batch_size, self._embedding_dim)
            std = mean + 1
            fakez = torch.normal(mean=mean, std=std).to(self._device)
            # latent code 생성: 균등 분포, 범위 -1 ~ 1
            latent_code = torch.rand(self._batch_size, self._latent_dim, device=self._device) * 2 - 1  

            # 조건 벡터 적용: 지정된 조건 벡터가 있으면 복사, 없으면 원본 데이터 분포 기반 조건 벡터 샘플링
            if global_condition_vec is not None:
                condvec = global_condition_vec.copy()
            else:
                condvec = self._data_sampler.sample_original_condvec(self._batch_size)

            # 조건 벡터가 존재하는 경우, 이를 노이즈 벡터에 결합
            if condvec is not None:
                c1 = condvec
                c1 = torch.from_numpy(c1).to(self._device)
                fakez = torch.cat([fakez, c1], dim=1)

            # Generator를 이용해 가짜 데이터 생성 (latent code 추가)
            fake = self._generator(fakez, latent_code)
            # Generator 출력에 활성화 함수 적용
            fakeact = self._apply_activate(fake)
            # 생성된 결과를 CPU로 옮기고 NumPy 배열로 변환 후 저장
            data.append(fakeact.detach().cpu().numpy())

        # 배치별 생성된 데이터를 하나의 배열로 결합하고, n개의 샘플만 선택
        data = np.concatenate(data, axis=0)
        data = data[:n]

        # DataTransformer의 inverse_transform을 통해 원본 데이터 형식으로 복원하여 반환
        return self._transformer.inverse_transform(data)

    def set_device(self, device):
        """
        모델에서 사용할 디바이스(GPU 또는 CPU)를 설정합니다.
        
        인자:
            device: 사용할 디바이스 ('GPU' 또는 'CPU' 혹은 해당 문자열)
        
        동작:
            Generator, Discriminator, Q 네트워크가 존재하는 경우 해당 디바이스로 이동시킵니다.
        """
        self._device = device
        if self._generator is not None:
            self._generator.to(self._device)
        if self._discriminator is not None:
            self._discriminator.to(self._device)
        if self._q_network is not None:
            self._q_network.to(self._device)


In [None]:
import pandas as pd
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import f1_score

data=pd.read_csv("/content/adult.csv")
# 종속변수는 그냥 0,1로 설정
data['income'] = data['income'].replace({' <=50K': 0, ' >50K': 1})

# 데이터 프레임을 훈련용과 테스트용으로 분리
train_df, test_df = train_test_split(real_data, test_size=0.3, random_state=42)


# 훈련용 데이터로 샘플 생성
# Names of the columns that are discrete
discrete_columns = [
    'workclass',
    'education',
    'marital-status',
    'occupation',
    'relationship',
    'race',
    'sex',
    'native-country',
    'income'
]

# CTGAN 모델 초기화
embedding_dim = 128
generator_dim = (256, 256)
discriminator_dim = (256, 256)
latent_dim = 1  # latent code의 차원
batch_size = 500
epochs = 100
cuda = True
pac = 10  # pac 값을 batch_size의 약수로 설정

ctgan = CTGAN(
    embedding_dim=embedding_dim,
    generator_dim=generator_dim,
    discriminator_dim=discriminator_dim,
    latent_dim=latent_dim,
    batch_size=batch_size,
    epochs=epochs,
    pac=pac,  # pac 값을 설정
    cuda=cuda
)

In [None]:
ctgan.fit(train_df, discrete_columns)
# Create synthetic data
synthetic_data = ctgan.sample(10000)

# 범주형 컬럼 get dummies 사용
synthetic_data_get_dummies=pd.get_dummies(synthetic_data)
test_df_get_dummies=pd.get_dummies(test_df)

X_train=synthetic_data_get_dummies.drop('income',axis=1)
y_train=synthetic_data_get_dummies['income']

X_test=test_df_get_dummies.drop('income',axis=1)
y_test=test_df_get_dummies['income']


# 범주형 컬럼에서 빈도가 적은 카테고리가 테스트에 포함되지 않을수도 있으니 그 카테고리 False로 차원 맞춰주기
if X_train.shape[1]>X_test.shape[1]:
  synthetic_data_get_dummies, test_df_get_dummies = synthetic_data_get_dummies.align(test_df_get_dummies, join='outer', axis=1, fill_value=False)
elif X_train.shape[1]<X_test.shape[1]:
  test_df_get_dummies, synthetic_data_get_dummies = test_df_get_dummies.align(synthetic_data_get_dummies, join='outer', axis=1, fill_value=False)
else:
  synthetic_data_get_dummies=pd.get_dummies(synthetic_data)
  test_df_get_dummies=pd.get_dummies(test_df)


X_train=synthetic_data_get_dummies.drop('income',axis=1)
y_train=synthetic_data_get_dummies['income']

X_test=test_df_get_dummies.drop('income',axis=1)
y_test=test_df_get_dummies['income']




print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)



# 의사결정 나무 모델 초기화 및 학습
clf = DecisionTreeClassifier(random_state=42)
clf.fit(X_train, y_train)



y_pred = clf.predict(X_test)




# latent_dim : 1, epochs : 100
# F1 스코어 계산
f1 = f1_score(y_test, y_pred)
print("F1 스코어:", f1)