In [None]:
import pandas as pd
import numpy as np
from gensim.models import Word2Vec # categorical feature to vectors
from random import shuffle
import copy
import re

In [None]:
recipe = pd.read_csv('recipe.csv')
train = pd.read_csv('train.csv')
test = pd.read_csv('test.csv')

In [None]:
recipe = recipe.dropna(subset=['CKG_NM','CKG_MTRL_CN','CKG_INBUN_NM'])
recipe = recipe.drop_duplicates(subset=['CKG_NM'])

In [None]:
#CKG_MTH_ACTO_NM 요리방법	CKG_MTRL_ACTO_NM 요리재료 CKG_KND_ACTO_NM 요리종류 CKG_MTRL_CN 요리재료 내용
recipe = recipe[['CKG_NM','CKG_STA_ACTO_NM', 'CKG_MTH_ACTO_NM',	'CKG_MTRL_ACTO_NM', 'CKG_KND_ACTO_NM' ,'CKG_MTRL_CN']]
recipe.head()

In [None]:
def remove_quantity_and_unit(recipe_ingredients):
    recipe_ingredients = recipe_ingredients.replace("[재료]", "")
    recipe_ingredients = recipe_ingredients.replace("[양념]", "|")
    ingredients_list = recipe_ingredients.split("|")

    cleaned_ingredients = []
    for ingredient in ingredients_list:
        parts = ingredient.strip().split(" ")
        ingredient_name = " ".join(parts[:-1])
        cleaned_ingredient = re.sub(r"\[.*?\]", "", ingredient_name).strip()
        if cleaned_ingredient:
            cleaned_ingredients.append(cleaned_ingredient)

    return cleaned_ingredients


def remove_empty_lists(df):
    df = df[df["CKG_MTRL_CN"].astype(bool)]
    return df

In [None]:
def preprocessing(df):
    df = pd.merge(df, recipe, left_on='food', right_on='CKG_NM', how='left')
    df.drop('CKG_NM', axis=1, inplace=True)
    df.drop('food',axis=1, inplace=True)
    df['bmi'] = df['weight'] / (df['height']/100)**2
    df['CKG_MTRL_CN']=df['CKG_MTRL_CN'].apply(remove_quantity_and_unit)
    df = remove_empty_lists(df)
    df['CKG_MTRL_CN']=df['CKG_MTRL_CN'].apply(lambda x: " ".join(x))
    df = df.reset_index(drop=True)
    return df

In [None]:
train=preprocessing(train)
test=preprocessing(test)

In [None]:
daset = pd.concat([train,test],axis=0)
cat_cols = train.select_dtypes(include=['object']).columns

In [None]:
def apply_w2v(sentences, model, num_features):
    def _average_word_vectors(words, model, vocabulary, num_features):
        feature_vector = np.zeros((num_features,), dtype="float64")
        n_words = 0.
        for word in words:
            if word in vocabulary: 
                n_words = n_words + 1.
                feature_vector = np.add(feature_vector, model.wv[word])

        if n_words:
            feature_vector = np.divide(feature_vector, n_words)
        return feature_vector
    
    vocab = set(model.wv.index_to_key)
    feats = [_average_word_vectors(s, model, vocab, num_features) for s in sentences]
    return np.array(feats)
def gen_cat2vec_sentences(data):
    X_w2v = copy.deepcopy(data)
    names = list(X_w2v.columns.values)
    for c in names:
        X_w2v[c] = X_w2v[c].fillna('unknow').astype('category')
        X_w2v[c] = X_w2v[c].cat.rename_categories(["%s %s" % (c, g) for g in X_w2v[c].cat.categories])
    X_w2v = X_w2v.values.tolist()
    return X_w2v

In [None]:
n_cat2vec_feature  = len(cat_cols) 
n_cat2vec_window   = len(cat_cols) * 2

def fit_cat2vec_model():
    X_w2v = gen_cat2vec_sentences(daset.loc[:,cat_cols].sample(frac=0.7))
    for i in X_w2v:
        shuffle(i)
    model = Word2Vec(X_w2v, vector_size=n_cat2vec_feature, window=n_cat2vec_window, seed=1)
    return model

c2v_model = fit_cat2vec_model()

In [None]:
tr_c2v_matrix = apply_w2v(gen_cat2vec_sentences(daset.iloc[:len(train)][cat_cols]), c2v_model, n_cat2vec_feature)
te_c2v_matrix = apply_w2v(gen_cat2vec_sentences(daset.iloc[len(train):][cat_cols]), c2v_model, n_cat2vec_feature)
tr_c2v_matrix = pd.DataFrame(tr_c2v_matrix)
te_c2v_matrix = pd.DataFrame(te_c2v_matrix)
new_columns = [f"cat2vec_{i+1}" for i in range(len(tr_c2v_matrix.columns))]
tr_c2v_matrix.columns = new_columns
te_c2v_matrix.columns = new_columns
train =train.drop(columns=cat_cols)
test = test.drop(columns=cat_cols)
train = pd.concat([train,tr_c2v_matrix], axis=1)
test = pd.concat([test,te_c2v_matrix], axis=1)

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD

tf_columns = ['CKG_MTRL_CN']

tfidf_vectorizer = TfidfVectorizer()
svd = TruncatedSVD(n_components=7, n_iter=7, random_state=42)

for col in tf_columns:
    tfidf_matrix = tfidf_vectorizer.fit_transform(daset[col])
    svd_matrix = svd.fit_transform(tfidf_matrix)
    svd_df = pd.DataFrame(svd_matrix)
    new_columns = [f"svd_{col}_{i+1}" for i in range(len(svd_df.columns))]
    svd_df.columns = new_columns
    train = pd.concat([train, svd_df.iloc[:len(train)]], axis=1)
    
    svd_df_te = svd_df.iloc[len(train):]
    svd_df_te.index = test.index
    test = pd.concat([test, svd_df_te], axis=1)

In [None]:
def label_encoding(series: pd.Series) -> pd.Series:
    my_dict = {}
    series = series.astype(str)

    for idx, value in enumerate(sorted(series.unique())):
        my_dict[value] = idx
    series = series.map(my_dict)

    return series

In [None]:
label_columns = ['gender', 'CKG_STA_ACTO_NM', 'CKG_MTH_ACTO_NM', 'CKG_MTRL_ACTO_NM', 'CKG_KND_ACTO_NM']
for col in label_columns:
    daset[col] = label_encoding(daset[col])
train[label_columns] = daset.iloc[:len(train)][label_columns]
test[label_columns] = daset.iloc[len(train):][label_columns]

In [None]:
seed=42
x_train, y_train = train.drop("portion", axis=1), train["portion"]
x_test, y_test = test.drop("portion", axis=1), test["portion"]

In [None]:
print(x_train.shape)

In [None]:
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
import optuna
from lightgbm import LGBMRegressor
from sklearn.pipeline import make_pipeline
from xgboost import XGBRegressor
from catboost import CatBoostRegressor 

def cv_regression(model, k_fold):
    # K-fold 교차 검증 설정
    k_folds = k_fold 
    kf = KFold(n_splits=k_folds, shuffle=True, random_state=seed)
    test_prediction = []
    k_rmse_score = []
    models = []
    for fold, (train_idx, val_idx) in enumerate(kf.split(x_train, y_train)):

        # 훈련 데이터와 검증 데이터 분할
        x_train_fold, x_val_fold = x_train.iloc[train_idx], x_train.iloc[val_idx]
        y_train_fold, y_val_fold = y_train.iloc[train_idx], y_train.iloc[val_idx]

        # 모델 학습
        model.fit(x_train_fold.fillna(0), y_train_fold)
        models.append(model)

        # 검증 데이터에 대한 예측
        y_pred = model.predict(x_val_fold.fillna(0))
        y_pred = np.round(y_pred,1)
        # 모델 평가 
        rmse = mean_squared_error(y_val_fold, y_pred)**0.5
        print(f"Fold {fold+1} - RMSE: {rmse}")
        k_rmse_score.append(rmse)

        # 테스트 데이터에 대한 예측
        test_prediction.append(model.predict(x_test.fillna(0)))

    return k_rmse_score, test_prediction, models

In [None]:
def lgb_objective(trial):
    params = {
        'learning_rate' : trial.suggest_float('learning_rate', .001, .1, log = True),
        'max_depth' : trial.suggest_int('max_depth', 3, 10),
        'subsample' : trial.suggest_float('subsample', .5, 1),
        'min_child_weight' : trial.suggest_float('min_child_weight', .1, 15, log = True),
        'reg_lambda' : trial.suggest_float('reg_lambda', .1, 20, log = True),
        'reg_alpha' : trial.suggest_float('reg_alpha', .1, 10, log = True),
        'n_estimators' : 1000,
        'random_state' : seed,
    }
    optuna_model = make_pipeline(
            LGBMRegressor(**params, verbosity=-1)
        )
    optuna_score, _, __ = cv_regression(optuna_model, 5)
    return np.mean(optuna_score)
lgb_study = optuna.create_study(direction = 'minimize')

In [None]:
lgb_study.optimize(lgb_objective, 50)

In [None]:
trial = lgb_study.best_trial
lgb_params = trial.params

In [None]:
def xgb_objective(trial):
    params = {
        'eta' : trial.suggest_float('eta', .001, .1, log = True),
        'max_depth' : trial.suggest_int('max_depth', 2, 30),
        'subsample' : trial.suggest_float('subsample', .5, 1),
        'colsample_bytree' : trial.suggest_float('colsample_bytree', .1, 1),
        'min_child_weight' : trial.suggest_float('min_child_weight', .1, 20, log = True),
        'reg_lambda' : trial.suggest_float('reg_lambda', .01, 20, log = True),
        'reg_alpha' : trial.suggest_float('reg_alpha', .01, 10, log = True),
        'n_estimators' : 1000,
        'random_state' : seed,
        'tree_method' : 'hist',
    }
    
    optuna_model = make_pipeline(
        XGBRegressor(**params, verbosity=0)  
    )
    
    optuna_score, _, _ = cv_regression(optuna_model, 5)
    return np.mean(optuna_score)

xgb_study = optuna.create_study(direction = 'minimize')

In [None]:
xgb_study.optimize(xgb_objective, 20)

In [None]:
trial = xgb_study.best_trial
xgb_params = trial.params

In [None]:
def cat_objective(trial):
    params = {
        "iterations": 1000,
        "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.1, log=True),
        "depth": trial.suggest_int("depth", 1, 10),
        "subsample": trial.suggest_float("subsample", 0.05, 1.0),
        "colsample_bylevel": trial.suggest_float("colsample_bylevel", 0.05, 1.0),
        "min_data_in_leaf": trial.suggest_int("min_data_in_leaf", 1, 100),
        'random_state' : seed,
    }
    optuna_model = make_pipeline(
        CatBoostRegressor(**params, verbose=0) 
    )
    
    optuna_score, _, _ = cv_regression(optuna_model, 5)
    return np.mean(optuna_score)

cat_study = optuna.create_study(direction='minimize')

In [None]:
cat_study.optimize(cat_objective, 30)

In [None]:
trial = cat_study.best_trial
cat_params = trial.params

In [None]:
optuna_model = make_pipeline(
            LGBMRegressor(**lgb_params, n_estimators= 1000, random_state=seed, verbosity=-1)
        )
optuna_score, lgb_pred, lgb_models = cv_regression(optuna_model, 5)

In [None]:
optuna_model = make_pipeline(
            XGBRegressor(**xgb_params, n_estimators= 1000, random_state=seed, verbosity=0)
        )
optuna_score, xgb_pred, xgb_models = cv_regression(optuna_model, 5)

In [None]:
optuna_model = make_pipeline(
            CatBoostRegressor(**cat_params, n_estimators= 1000, random_state=seed, verbose=0)
        )
optuna_score, cat_pred, cat_models = cv_regression(optuna_model, 5)

In [None]:
print('lgbm rmse : ', mean_squared_error(y_test,np.mean(lgb_pred,axis=0)**0.5))
print('xgb rmse : ', mean_squared_error(y_test,np.mean(xgb_pred,axis=0)**0.5))
print('cat rmse : ', mean_squared_error(y_test,np.mean(cat_pred,axis=0)**0.5))

In [None]:
import joblib
def save_models(models, name, model_dir='models/'):
    for i, model in enumerate(models):
        filename = model_dir + f'{name}_{i}.pkl'
        joblib.dump(model, filename)
        print(f'Model {i} saved as {filename}')
        
def load_models(model_dir='models/', name='', n_models=5):
    models = []
    for i in range(n_models):
        filename = model_dir + f'{name}_{i}.pkl'
        model = joblib.load(filename)
        models.append(model)
    return models

In [None]:
save_models(lgb_models,'lgb')
save_models(xgb_models,'xgb')
save_models(cat_models,'cat')

In [None]:
lgb_models = load_models(name='lgb')
xgb_models = load_models(name='xgb')
cat_models = load_models(name='cat')

In [None]:
def predict_test_data(models, test_data):
    predictions = []
    for model in models:
        prediction = model.predict(test_data)
        predictions.append(prediction)
    return np.mean(predictions)

In [None]:
test_predictions = predict_test_data(lgb_models, x_test)
test_predictions