## 파일 목적  
Surrogate 모델에 GA(유전 알고리즘)을 적용하기 전에, 실제 데이터를 불러와 input과 output을 확인하면서 end-to-end 학습이 정상적으로 이루어지는지 검증하는 것

## 데이터 불러오기

In [None]:
import fireducks.pandas as pd
df = pd.read_csv('../data/concrete_processed.csv')

df.head()

In [None]:
import torch
import sklearn

In [None]:
target = 'strength'

In [None]:
def split_data(df, target, test_size=0.2, random_state=42):
    X = df.drop(columns=[target])
    y = df[target]
    X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, test_size=test_size, random_state=random_state)
    return X_train, X_test, y_train, y_test

In [None]:
X_train, X_test, y_train, y_test = split_data(df, target)

In [None]:
print(type(X_train))
print(type(X_test))
print(type(y_train))
print(type(y_test))

## Search Model(simpleNN)과 Surrogate Model(GA - deap라이브러리) 연결 확인

In [None]:
import numpy as np
class SimpleNN_dataloader(torch.utils.data.Dataset):

    def __init__(self,X,y):
        self.X = np.array(X)
        self.y = np.array(y)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        
        X = torch.tensor(self.X[idx], dtype=torch.float32)
        y = torch.tensor([self.y[idx]], dtype=torch.float32)
        # y = torch.log(y)
        return X, y


def simpleNN_load_data(X_train,X_test,y_train,y_test):

    train_data = SimpleNN_dataloader(X_train,y_train)
    test_data = SimpleNN_dataloader(X_test,y_test)

    train_loader = torch.utils.data.DataLoader(train_data, batch_size=10, shuffle=True)
    test_loader = torch.utils.data.DataLoader(test_data, batch_size=10, shuffle=False)
    return train_loader, test_loader

In [None]:
train_loader, test_loader = simpleNN_load_data(X_train, X_test, y_train, y_test)

In [None]:
import torch
from sklearn.metrics import root_mean_squared_error, mean_absolute_error, r2_score
import numpy as np

class simpleNN_model(torch.nn.Module):
    def __init__(self,input_size,output_size=1):
        super(simpleNN_model,self).__init__()
        self.fc1 = torch.nn.Linear(input_size,16)
        self.fc2 = torch.nn.Linear(16,32)
        self.fc3 = torch.nn.Linear(32,output_size)

    def forward(self,x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x
    def to(self, *args, **kwargs):
        super().to(*args, **kwargs)
        self.device = next(self.parameters()).device  # device 속성 자동 설정
        return self


def simpleNN_train(train_loader,val_loader):

    model = simpleNN_model(input_size=train_loader.dataset.X.shape[1])
    model.to('cuda')
    optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
    loss_fn = torch.nn.MSELoss()

    for epoch in range(epochs:=200):
        train_loss = 0
        val_loss = 0

        for data,target in train_loader:
            optimizer.zero_grad()
            data = data.to(model.device)
            target = target.to(model.device)
            output = model(data)
            loss = loss_fn(output, target)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        if epoch % 10 == 0:
            print(f"Epoch {epoch}, Loss: {train_loss/len(train_loader):.4f}")

            for data,target in val_loader:
                data = data.to(model.device)
                target = target.to(model.device)
                output = model(data)
                loss = loss_fn(output, target)
                val_loss += loss.item()
            print(f"Validation Loss: {val_loss/len(val_loader):.4f}")

    return model

In [None]:
def simpleNN_predict(model,X_test):
    model.eval()
    if isinstance(X_test, torch.Tensor):
        with torch.no_grad():
            X_test = X_test.to(model.device)
            output = model(X_test)
        # return output.numpy()
        return output.cpu().numpy()  # GPU에서 CPU로 복사 후 numpy로 변환
    elif isinstance(X_test, torch.utils.data.DataLoader):
        y_pred = []
        for data,target in X_test:
            with torch.no_grad():
                data = data.to(model.device)
                output = model(data)
                output = output.detach().cpu().numpy()
                # target = target.numpy()
                y_pred.append(output)
        y_pred = np.concatenate(y_pred, axis=0).squeeze()
        return y_pred

In [None]:
simpleNN_model = simpleNN_train(train_loader=train_loader, val_loader=test_loader)

In [None]:
import numpy as np
import torch

In [None]:
y_target = y_test

In [None]:
x_sum = np.sum(X_train, axis=0)
x_sum

In [None]:
train_len = X_train.shape[0]

In [None]:
x_mean = x_sum / train_len
x_mean

In [None]:
x_min = np.min(X_train, axis=0)
x_max = np.max(X_train, axis=0)

In [None]:
print(x_min)
print(x_max)

In [None]:
X_test.shape

In [None]:
import random
from tqdm import tqdm
from deap import base, creator, tools


def ga_deap_search(model, pred_func, X_train, X_test, y_test):
    test = X_test
    gt_ys = y_test

    x_min = np.min(X_train, axis=0)
    x_max = np.max(X_train, axis=0) 
    
    res = []
    for gt_y in tqdm(gt_ys):

        def fitness(population):
            # x_tensor = torch.tensor(individual, dtype=torch.float32).unsqueeze(0).to('cuda') # 배치차원추가
            x_tensors = torch.tensor(population, dtype=torch.float32).reshape(-1, 8).to('cuda')

            print('x_tensors shape : ', x_tensors.shape)
            print('x_tensors type : ', type(x_tensors))
            
            with torch.no_grad():
                y_pred = pred_func(model=simpleNN_model, X_test=x_tensors)
                y_pred_tensor = torch.tensor(y_pred, dtype=torch.float32)
            
            print('y pred shape : ', y_pred_tensor.shape)
            print('y pred type : ', type(y_pred_tensor))
            
            fit_fun = -(y_pred_tensor - gt_y)**2
            return fit_fun

        creator.create('FitnessMax', base.Fitness, weights=(1.0,))
        creator.create('Individual', list, fitness=creator.FitnessMax)


        toolbox = base.Toolbox()
        toolbox.register('attr_float', random.uniform, x_min, x_max)
        toolbox.register('individual', tools.initRepeat, creator.Individual, toolbox.attr_float, n=1)
        toolbox.register('population', tools.initRepeat, list, toolbox.individual)

        
        toolbox.register('evaluate', fitness)
        # toolbox.register('select', tools.selTournament, tournsize=3)
        toolbox.register('select', tools.selBest, k=5) # Rank Selection
        toolbox.register('mate', tools.cxBlend, alpha=0.5)
        toolbox.register('mutate', tools.mutGaussian, mu=0, sigma=1, indpb=0.2)

        pop_size = 50
        # pop_size = len(test)
        population = toolbox.population(n=pop_size)
        print('population type', type(population))
        for gen in range(100):

            # fitness_scores = [toolbox.evaluate(ind)[0] for ind in population]
            fitness_scores = toolbox.evaluate(population)
            for ind, fit in zip(population, fitness_scores):
                ind.fitness.values = (fit,)

            # offspring 생성
            if len(population) == 1:
                break
            parents = toolbox.select(population, k=len(population)) # Rank Selection
            offspring = tools.selBest(parents, k=len(population))
            offspring = list(map(toolbox.clone, offspring))

            # crossover
            for i in range(1, len(offspring), 2):
                if random.random() < 0.7:
                    toolbox.mate(offspring[i-1], offspring[i])

            # mutation
            for child in offspring:
                if random.random() < 0.2:
                    toolbox.mutate(child)

            # 새로운 자식만 평가
            for ind in offspring:
                del ind.fitness.values

            # 다음 세대 개체로 갱신
            population[:] = offspring
        
        best_individual = tools.selBest(population, k=1)[0]
        best_individual = best_individual[0]
            
        x_pred = np.array(best_individual)

        x_pred = x_pred.reshape(1,8)
        res.append(x_pred)
        break
    
    return np.concatenate(res, axis=0)
    

In [None]:
res = ga_deap_search(simpleNN_model, simpleNN_predict, X_train, X_test, y_test)

"""
x_tensors shape :  torch.Size([50, 8])
x_tensors type :  <class 'torch.Tensor'>
y pred shape :  torch.Size([50, 1])
y pred type :  <class 'torch.Tensor'>
"""

## Search Model(simpleNN)과 Surrogate Model(GA - pygmo라이브러리) 연결 확인

In [None]:
import torch
import numpy as np
import pygmo as pg
from tqdm import tqdm


def ga_pygmo_search(model, pred_func, X_train, X_test, y_test):

    test = X_test
    gt_ys = y_test

    x_min = np.min(X_train, axis=0)
    x_max = np.max(X_train, axis=0)

    class SphereProblem:
        def __init__(self, model, gt_y, x_min, x_max):
            self.model = model
            self.gt_y = gt_y
            self.x_min = x_min
            self.x_max = x_max

        def fitness(self, x):
            x_tensor = torch.tensor(x, dtype=torch.float32).unsqueeze(0).to('cuda')  # 배치 차원 추가

            # print('x_tensor shape : ', x_tensor.shape)
            # print('x_tensor type : ', type(x_tensor))
            with torch.no_grad():
                y_pred = model(x_tensor)
                y_pred_tensor = torch.tensor(y_pred, dtype=torch.float32)
            # print('y pred shape : ', y_pred_tensor.shape)
            # print('y pred type : ', type(y_pred_tensor))
            fit_fun = -((y_pred_tensor.item() - self.gt_y) ** 2)
            return [fit_fun]
            

        def get_bounds(self):
            return (self.x_min.tolist(), self.x_max.tolist()) 
        
    def batch_evaluate_population(model, pop, gt_y, pred_func):
        
        x_tensors = torch.tensor(pop.get_x(), dtype=torch.float32).reshape(-1, 8).to('cuda')
        print('batch x_tensors shape : ', x_tensors.shape)
        print('batch x_tensors type : ', type(x_tensors))

        with torch.no_grad():
            y_preds = pred_func(model=simpleNN_model, X_test=x_tensors)
            y_pred_tensor = torch.tensor(y_preds, dtype=torch.float32)
        
        print('batch y_pred_tensor shape : ', y_pred_tensor.shape)
        print('batch y_pred_tensor type : ', type(y_pred_tensor))

        fitness_values = -((y_pred_tensor - gt_y) ** 2).cpu().numpy()
        
        # for idx, fit_val in enumerate(fitness_values):
        #     pop.set_f(idx, [fit_val])
        
        # return pop
        new_pop = pg.population(pop.problem)
        for i, fit_val in enumerate(fitness_values):
            # # 기존 개체를 그대로 추가하면서 fitness 값만 업데이트
            # individual = pop.get_x()[i].flatten() if len(pop.get_x()[i].shape) > 1 else pop.get_x()[i]
            # print(f"Shape of individual before push_back: {individual.shape}")
            individual = np.array(pop.get_x()[i]).flatten()
            fit_val = float(fit_val)
            new_pop.push_back(individual, [fit_val])
    
        return new_pop
    
    
    res = []

    for gt_y in tqdm(gt_ys):

        prob = pg.problem(SphereProblem(simpleNN_model, gt_y, x_min, x_max))

        algo = pg.algorithm(pg.sga(gen=100, cr=0.7, eta_c=1.0, m=0.2, param_m=1.0))

        pop = pg.population(prob, size=50)

        # 배치 평가 호출
        pop = batch_evaluate_population(simpleNN_model, pop, gt_y, pred_func)
        
        # pop = algo.evolve(pop)

        best_individual = pop.champion_x

        x_pred = np.array(best_individual)
        x_pred = x_pred.reshape(1,8)
        res.append(x_pred)
        break

    return np.concatenate(res, axis=0)

In [None]:
res = ga_pygmo_search(simpleNN_model, simpleNN_predict, X_train, X_test, y_test)
"""
batch x_tensors shape :  torch.Size([50, 8])
batch x_tensors type :  <class 'torch.Tensor'>
batch y_pred_tensor shape :  torch.Size([50, 1])
batch y_pred_tensor type :  <class 'torch.Tensor'>
"""

## Search Model(LightGBM)과 Surrogate Model(GA - deap라이브러리) 연결 확인

In [None]:
import lightgbm as lgb
import numpy as np

def lightgbm_train(train_data, val_data, params = None):

    if params is None:
        params = {
            "objective": "regression",        # 회귀 문제
            "metric": "rmse",                 # 평가 지표
            "boosting_type": "gbdt",          # 부스팅 방식
            "learning_rate": 0.05,            # 학습 속도
            "num_leaves": 31,                 # 리프 노드 개수
            "feature_fraction": 0.8,          # 피처 샘플링 비율
            "bagging_fraction": 0.8,          # 데이터 샘플링 비율
            "bagging_freq": 5,                # 샘플링 빈도
            "min_data_in_leaf": 20,           # 리프 노드 최소 데이터 수
            "verbosity": -1                   # 출력 최소화
        }


    model = lgb.train(
        params,
        train_data,
        num_boost_round=1000,
        valid_sets=[train_data, val_data],
        valid_names=["train", "valid"],
        callbacks=[
            lgb.log_evaluation(period=100),  # 100번마다 로그 출력
            lgb.early_stopping(stopping_rounds=50)  # 조기 종료 설정
        ]
    )

    return model

def lightgbm_evaluate(model, train_data, val_data):
    # import pdb; pdb.set_trace()
    # X_train = train_data.get_data()
    # print(train_data)
    y_train = train_data.get_label()
    X_test = val_data.get_data()
    y_test = val_data.get_label()

    y_pred = model.predict(X_test, num_iteration=model.best_iteration)
    
    
    mae = np.mean(np.abs(y_test - y_pred))
    SSE = np.sum(np.square(y_test - y_pred))    
    SST = np.sum(np.square(y_test - y_train.mean()))
    r2 = 1 - SSE/SST

    rmse = np.sqrt(np.mean(np.square(y_test - y_pred)))
    return rmse, mae, r2



def lightgbm_predict(model, X_test):
    # Tensor를 numpy 배열로 변환
    X_test = X_test.detach().cpu().numpy()
    
    # X_test = X_test.get_data()
    y_pred = model.predict(X_test, num_iteration=model.best_iteration)
    return y_pred

In [None]:
import lightgbm as lgb


def lightgbm_load_data(X_train, X_test, y_train, y_test):
    
    train_data = lgb.Dataset(X_train, label=y_train, free_raw_data=False)
    val_data = lgb.Dataset(X_test, label=y_test, reference=train_data,free_raw_data=False)
 
    return train_data, val_data

In [None]:
train_data, val_data = lightgbm_load_data(X_train, X_test, y_train, y_test)

In [None]:
lightgbm_model = lightgbm_train(train_data, val_data)

In [None]:
import numpy as np
import torch
# model.eval()

In [None]:
y_target = y_test

In [None]:
import random
from tqdm import tqdm
from deap import base, creator, tools


def ga_deap_search(model, pred_func, X_train, X_test, y_test):
    test = X_test
    gt_ys = y_test

    x_min = np.min(X_train, axis=0)
    x_max = np.max(X_train, axis=0) 
    
    res = []
    for gt_y in tqdm(gt_ys):

        def fitness(population):
            # x_tensor = torch.tensor(individual, dtype=torch.float32).unsqueeze(0).to('cuda') # 배치차원추가
            x_tensors = torch.tensor(population, dtype=torch.float32).reshape(-1, 8).to('cuda')

            print('x_tensors shape : ', x_tensors.shape)
            print('x_tensors type : ', type(x_tensors))
            
            with torch.no_grad():
                y_pred = pred_func(model=lightgbm_model, X_test=x_tensors)
                y_pred_tensor = torch.tensor(y_pred, dtype=torch.float32)
            
            print('y_pred_tensor shape : ', y_pred_tensor.shape)
            print('y_pred_tensor type : ', type(y_pred_tensor))
            
            fit_fun = -(y_pred_tensor - gt_y)**2
            return fit_fun

        creator.create('FitnessMax', base.Fitness, weights=(1.0,))
        creator.create('Individual', list, fitness=creator.FitnessMax)


        toolbox = base.Toolbox()
        toolbox.register('attr_float', random.uniform, x_min, x_max)
        toolbox.register('individual', tools.initRepeat, creator.Individual, toolbox.attr_float, n=1)
        toolbox.register('population', tools.initRepeat, list, toolbox.individual)

        
        toolbox.register('evaluate', fitness)
        # toolbox.register('select', tools.selTournament, tournsize=3)
        toolbox.register('select', tools.selBest, k=5) # Rank Selection
        toolbox.register('mate', tools.cxBlend, alpha=0.5)
        toolbox.register('mutate', tools.mutGaussian, mu=0, sigma=1, indpb=0.2)

        pop_size = 50
        # pop_size = len(test)
        population = toolbox.population(n=pop_size)
        print('population type', type(population))
        for gen in range(100):

            # fitness_scores = [toolbox.evaluate(ind)[0] for ind in population]
            fitness_scores = toolbox.evaluate(population)
            for ind, fit in zip(population, fitness_scores):
                ind.fitness.values = (fit,)

            # offspring 생성
            if len(population) == 1:
                break
            parents = toolbox.select(population, k=len(population)) # Rank Selection
            offspring = tools.selBest(parents, k=len(population))
            offspring = list(map(toolbox.clone, offspring))

            # crossover
            for i in range(1, len(offspring), 2):
                if random.random() < 0.7:
                    toolbox.mate(offspring[i-1], offspring[i])

            # mutation
            for child in offspring:
                if random.random() < 0.2:
                    toolbox.mutate(child)

            # 새로운 자식만 평가
            for ind in offspring:
                del ind.fitness.values

            # 다음 세대 개체로 갱신
            population[:] = offspring
        
        best_individual = tools.selBest(population, k=1)[0]
        best_individual = best_individual[0]
            
        x_pred = np.array(best_individual)

        x_pred = x_pred.reshape(1,8)
        res.append(x_pred)
        break
    
    return np.concatenate(res, axis=0)
    

In [None]:
res = ga_deap_search(lightgbm_model, lightgbm_predict, X_train, X_test, y_test)
"""
x_tensors shape :  torch.Size([50, 8])
x_tensors type :  <class 'torch.Tensor'>
y pred shape :  torch.Size([50])
y pred type :  <class 'torch.Tensor'>
"""