In [None]:
from sdv.datasets.demo import download_demo
real_data, metadata = download_demo(
    modality='single_table',
    dataset_name='fake_hotel_guests')
    

In [None]:
from sdv.lite import SingleTablePreset

synthesizer = SingleTablePreset(metadata, name='FAST_ML')
synthesizer.fit(data=real_data)

In [None]:
from typing import List, Tuple
from sdv.single_table import CTGANSynthesizer
from sklearn.mixture import GaussianMixture
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sdv.metadata import SingleTableMetadata
from sklearn.neighbors import KernelDensity
from sdv.lite import SingleTablePreset

# push + prd

# imbalanced에 data level로 해결하는 모델
class FiGen:
    def __init__(self, ratio: float, index: List[str]):
        """
        고정적으로 사용하는 값을 저장
        
        Args:
            ratio (float): small class+생성된 데이터와 large class의 비율 
            index (List[int]): 범주형, 연속형 구분하기 위한 연속형 변수의 컬럼명 인덱스       
        """
        self.result = 0
        self.ratio = ratio
        self.index = index


    def extract_middle_percent(self, data: pd.DataFrame, start: float, last:float):
        """
        데이터의 분포 중 중간 부분을 추출 
        
        Args:
            data : 입력 데이터
            start : 추출 시작 percentile 
            last : 추출 끝 percentile
        Returns:    
            데이터의 분포 중 중간 부분을 추출하여 리턴
        """
        scaler = StandardScaler()
        data_scaled = scaler.fit_transform(data.values)
        kde = KernelDensity(kernel="gaussian", bandwidth=0.5).fit(
            data_scaled
        )  ##TODO: 계산이 안터지도록 하기, gmm으로 변경
        log_prob = kde.score_samples(data_scaled)
        prob = np.exp(log_prob)
        threshold_low, threshold_high = np.percentile(prob, [start, last])
        mask = np.logical_and(prob >= threshold_low, prob <= threshold_high)
        data_middle = data[mask]

        if len(data_middle) > 0:
            return data_middle
        else:
            print("No middle 50% found, returning original data")
            return []
        
    def find_categorical(
        self, suitable_generated_small_X: pd.DataFrame, categorical_small_X: pd.DataFrame, small_X: pd.DataFrame
    ):  
        """
        생성된 연속형변수와 기존 연속형 변수의 cosine simmilarity를 기준으로 가장 가까운 기존 변수를 찾은 후 해당 변수의 범주형 값을 가져옴
        
        Args:
            suitable_generated_small_X : 생성된 적합한 small class의 연속형 변수만 있는 x 
            small_X : small class의 연속형, 범주형 변수가 모두 있는 orgin x
        Returns:
            생성된 연속 변수를 범주형 변수값이 결합된 형태로 리턴 
        """

        # Min-Max 스케일링을 위한 객체 생성
        scaler = MinMaxScaler()


        # 열별 Min-Max 스케일링 수행
        suitable_generated_small_scaled_X = pd.DataFrame(
            scaler.fit_transform(suitable_generated_small_X),
            columns=suitable_generated_small_X.columns,
        )
 
        orgin_small_non_cat_scaled_X = pd.DataFrame(
            scaler.fit_transform(small_X[self.index]),
            columns=self.index
        )

        # 데이터프레임을 numpy 배열로 변환
        array_mxn = suitable_generated_small_scaled_X.values
        array_kxn = orgin_small_non_cat_scaled_X.values
        
        # 행렬곱 수행 (mxn과 nxk로 계산)
        result_array = np.dot(array_mxn, array_kxn.T)

        # 각 행에서 최대값을 가지는 열의 인덱스를 가져와서 리스트로 만들기
        max_indices = np.argmax(result_array, axis=1).tolist()

        # 가장큰 열 인덱스가 들어있는 리스트의 인덱스에 따라 범주형 값 가져오기
        synthetic_small_X = pd.concat(
            [suitable_generated_small_scaled_X, categorical_small_X.loc[max_indices]],axis=1
        )

        return synthetic_small_X

    def suitable_judge(self, midlle_small_X:pd.DataFrame, small_X: pd.DataFrame, large_X: pd.DataFrame):
        """
           generated_x : 생성된 small class x 데이터
           small_X : 원본 small class x 데이터 
           large_X : 원본 large class x 데이터
        """
        # 연속형small x로 뽑아야함
        center_small_X = np.mean(
            small_X[self.index].values, axis=0, dtype=np.float64, out=None 
        )
        radius_small_X = np.max(
            np.linalg.norm(small_X[self.index].values - center_small_X, axis=1)
        )

        center_large_X = np.mean(
            large_X[self.index].values, axis=0, dtype=np.float64, out=None 
        )

        radius_large_X = np.max(
            np.linalg.norm(large_X[self.index].values - center_large_X, axis=1)
        )

        synthetic_sample = pd.DataFrame()  # 최종 합치기
       

        # ctgan으로 연속형 생성 부분
        metadata = SingleTableMetadata()
        metadata.detect_from_dataframe(data=midlle_small_X)
        
        synthesizer = SingleTablePreset(metadata, name='FAST_ML')
        synthesizer.fit(data=midlle_small_X)
        
        
        # 합성된 개수 / 원래 large 클래스 개수 <= ratio 만족시 그만 생성    
        
        while len(synthetic_sample) / len(large_X) < self.ratio:

            # large class의 데이터 사이즈 10배 만큼 데이터 생성
            synthetic_data = synthesizer.sample(num_rows=len(large_X))  

            synthetic_samples_to_generate = int((self.ratio - len(synthetic_sample) / len(large_X)) * len(large_X))
            if synthetic_samples_to_generate == 0:
                break  # 더 이상 생성이 필요하지 않을 경우 루프를 빠져나감
            z = synthetic_data.iloc[:synthetic_samples_to_generate]  # 벡터화된 방식으로 일괄 처리
        
            distances_small = np.linalg.norm(z.values[:, np.newaxis, :] - center_small_X, axis=2)
            distances_large = np.linalg.norm(z.values[:, np.newaxis, :] - center_large_X, axis=2)
        
            small_condition = distances_small < radius_small_X
            large_condition = distances_large < radius_large_X

            # 생성된 small class 데이터가 small, large class 중 small에 가까운지, small class의 지름을 넘지는 않는지
            condition = np.logical_and(small_condition, distances_small < distances_large)
        
            synthetic_sample = pd.concat([synthetic_sample, z[condition]])
            
        return synthetic_sample.reset_index(drop=True)
    
    
    def generate_synthetic(
        self, small_X: pd.DataFrame, large_X: pd.DataFrame, small_Y: pd.DataFrame, large_Y: pd.DataFrame
    ) -> Tuple[pd.DataFrame, pd.Series]:
        """
        생성된 데이터셋 + 기존 데이터셋을 합쳐 통합 데이터셋을 생성
        
        Args:
            small_X (pd.DataFrame): small class의 x
            large_X (pd.DataFrame): large class의 x
        Returns:
            생성된 데이터셋 + 기존 데이터셋을 합쳐 통합 데이터셋을 리턴
        """

        # Nan 값 제거 요청 
        assert not large_X.isnull().values.any(), "large_X 입력 데이터에 NaN 값이 포함되어 있습니다." 
        assert not small_X.isnull().values.any(), "small_X 입력 데이터에 NaN 값이 포함되어 있습니다."    
 

        # 연속형 변수만 가져오는 부분
        continue_small_X = small_X[self.index]
        continue_large_X = large_X[self.index]

        # 범주형 변수만 가져오는 부분
        categorical_small_X = small_X[list(set(small_X.columns) - set(self.index))]
        categorical_large_X = large_X[list(set(small_X.columns) - set(self.index))]

    
        # 상위 n% 필터링 부분
        midlle_small_X = self.extract_middle_percent(
            continue_small_X, 25, 75
        )  ##TODO: 추후에 하이퍼 파라미터로 뺄 수 있음
        midlle_large_X = self.extract_middle_percent(
            continue_large_X, 15, 85
        )  ##TODO: 추후에 하이퍼 파라미터로 뺄 수 있음
        
        # 연속형 데이터 생성 및 데이터 적합 판단

        suitable_generated_small_X = self.suitable_judge(midlle_small_X, small_X, large_X)
      
        # 코사인 유사도 기반으로 가장 가까운 기존 변수의 범주형 변수 값 가져오기
  
        synthetic_small_X = self.find_categorical(
            suitable_generated_small_X, categorical_small_X, small_X 
        )
       
        # small class와 large class 합치기
        origin_small_x = pd.concat(
            [midlle_small_X, categorical_small_X.loc[midlle_small_X.index]], axis=1
        )
   
        small_total_x = pd.concat([synthetic_small_X, origin_small_x], axis=0)
        small_total_x["target"] = small_Y[0]

        origin_large_x = pd.concat(
            [midlle_large_X, categorical_large_X.loc[midlle_large_X.index]], axis=1
        )

        origin_large_x["target"] = small_Y[0]
        total = pd.concat([small_total_x, origin_large_x], axis=0)
        return total.drop(columns=["target"]), total["target"]
    
    
    def fit(
        self,
        small_X: pd.DataFrame,
        small_Y: pd.DataFrame,
        large_X: pd.DataFrame,
        large_Y: pd.DataFrame        
    ):
        """
        데이터를 학습 시키는 함수
        Args:
            small_X (pd.DataFrame): small class의 x
            small_Y (pd.DataFrame): small class의 y
            large_X (pd.DataFrame): large class의 x
            large_Y (pd.DataFrame): large class의 y
        Returns:
            Tuple[pd.DataFrame, pd.DataFrame]: synthetic X, y
        
        """
        # 합성+ 기존 data set 생성
        synthetic_X, synthetic_Y = self.generate_synthetic(
            small_X, large_X, small_Y, large_Y
        )
        return synthetic_X, synthetic_Y


In [None]:
GEN = FiGen(0.3,['amenities_fee','room_rate'])

In [None]:
# y = has_rewards
real_data = real_data.dropna(axis=0)
small_X = real_data[real_data['has_rewards'] == True]
small_Y = real_data[real_data['has_rewards'] == True].iloc[:, [1]]
large_X = real_data[real_data['has_rewards'] == False]
large_Y = real_data[real_data['has_rewards'] == False].iloc[:, [1]]

# 생성된 것 중에 적합헌 것이 없어 넘어가는 과장이 원활히 되지 않음

In [None]:
small_X 

In [None]:
synthetic_X, synthetic_Y, test = GEN.fit(small_X, small_Y, large_X, large_Y)

In [None]:
con_index = ['amenities_fee','room_rate']

In [None]:

continue_small_X = small_X[con_index]
categorical_small_X = small_X[list(set(small_X.columns) - set(con_index))]

In [None]:
def find_categorical(
        suitable_generated_small_X: pd.DataFrame, categorical_small_X: pd.DataFrame, small_X: pd.DataFrame
    ):  
        """
        생성된 연속형변수와 기존 연속형 변수의 cosine simmilarity를 기준으로 가장 가까운 기존 변수를 찾은 후 해당 변수의 범주형 값을 가져옴
        
        Args:
            suitable_generated_small_X : 생성된 적합한 small class의 연속형 변수만 있는 x 
            small_X : small class의 연속형, 범주형 변수가 모두 있는 orgin x
        Returns:
            생성된 연속 변수를 범주형 변수값이 결합된 형태로 리턴 
        """

        # Min-Max 스케일링을 위한 객체 생성
        scaler = MinMaxScaler()

        # 열별 Min-Max 스케일링 수행
        suitable_generated_small_scaled_X = pd.DataFrame(
            scaler.fit_transform(suitable_generated_small_X),
            columns=suitable_generated_small_X.columns,
        )
 
        orgin_small_non_cat_scaled_X = pd.DataFrame(
            scaler.fit_transform(small_X[con_index]),
            columns=con_index
        )

        # 데이터프레임을 numpy 배열로 변환
        array_mxn = suitable_generated_small_scaled_X.values
        array_kxn = orgin_small_non_cat_scaled_X.values
        
        # 행렬곱 수행 (mxn과 nxk로 계산)
        result_array = np.dot(array_mxn, array_kxn.T)
        # 각 행에서 최대값을 가지는 열의 인덱스를 가져와서 리스트로 만들기
        max_indices = np.argmax(result_array, axis=1).tolist()

        # 가장큰 열 인덱스가 들어있는 리스트의 인덱스에 따라 범주형 값 가져오기
        synthetic_small_X = pd.concat(
            [suitable_generated_small_scaled_X, categorical_small_X.loc[max_indices]],axis=1
        )

        return synthetic_small_X

In [None]:
synthetic_small_X = find_categorical(
            continue_small_X.iloc[:10], categorical_small_X, small_X )

In [None]:
continue_small_X.iloc[:10]

In [None]:
test_np = [1.00000000e+00, 3.30228798e-01, 0.00000000e+00, 6.15400836e-02,
  3.57431554e-02, 3.53530383e-01, 5.13478368e-02, 4.87646294e-01,
  2.10065722e-01, 1.11236074e-01, 1.02273925e-01, 1.69402172e-02,
  2.04336977e-01, 4.98682037e-01, 3.27030542e-01, 3.51456788e-02,
  1.79594419e-02, 6.45661266e-01, 4.22661934e-01, 4.27617474e-01,
  2.73538818e-01, 5.15165360e-01, 2.39904404e-01, 4.37141953e-01,
  4.64801603e-01, 2.08976206e-01, 4.80722595e-01, 4.31413208e-01,
  4.32362141e-01, 4.19955716e-01, 4.19955716e-01, 4.76926862e-02,
  2.90654764e-02, 2.56704038e-01, 5.48342881e-01, 2.53013742e-01,
  4.35384669e-01, 6.60738762e-03, 3.07348961e-01, 3.12796542e-03,
  7.83397181e-02, 3.18068393e-01, 3.01303905e-01, 4.25122131e-01,
  3.17927811e-01, 7.92640495e-01, 4.77664921e-01, 4.22837662e-01,
  4.24067761e-01, 4.55734018e-01, 5.34249464e-01, 4.29761361e-01,
  2.47495870e-01]

max(test_np)

In [None]:
real_data

In [None]:
# 가장큰 열 인덱스가 들어있는 리스트의 인덱스에 따라 범주형 값 가져오기 -> 값이 중복되는 경우에 대차 Ex ) 0,0,0,0, 