##**기본 (cv 미사용)**

**라이브러리 임포트**

In [None]:
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import pandas as pd

**메인 코드**

In [None]:
# 데이터 로드
data = pd.read_csv("satellite_pm_data.csv")  # 위성 밴드값 + PM 데이터

X = data.drop("PM_value", axis=1)  # 특성(밴드값)
y = data["PM_value"]  # 타겟 값(PM 2.5)

# 학습/테스트 분리
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 모델 생성
model = xgb.XGBRegressor(
    n_estimators=200,
    learning_rate=0.05,
    max_depth=6,
    subsample=0.8,
    colsample_bytree=0.8,
    random_state=42
)

# 학습
model.fit(X_train, y_train, early_stopping_rounds=10, eval_set=[(X_test, y_test)], verbose=True)

# 예측
y_pred = model.predict(X_test)

# 평가
rmse = mean_squared_error(y_test, y_pred, squared=False)
print(f"RMSE: {rmse:.2f}")

# 특성 중요도 시각화
xgb.plot_importance(model, importance_type='gain')
plt.show()

##**K-fold**

###지역 별로 나눠서

**라이브러리 임포트**

In [None]:
import os
from sklearn.model_selection import GroupKFold

In [None]:
'''
한 지역 데이터를 테스트로 쓰겠다는 의미
train : 강릉, 아산, 서산....
test : 광주

train : 아산, 서산, 광주....
test : 강릉
'''



# 데이터 로드 및 통합 함수
def load_data(directory):
    all_data = []
    for file in os.listdir(directory):
        if file.endswith(".csv"):
            region, year = file.replace(".csv", "").split("_")
            data = pd.read_csv(os.path.join(directory, file))
            data["region"] = region
            data["year"] = int(year)
            all_data.append(data)
    return pd.concat(all_data, ignore_index=True)

# 데이터 경로 설정 및 로드
data_directory = "path_to_your_data"  # 데이터를 저장한 디렉토리 경로
combined_data = load_data(data_directory)

# 특성과 타겟 설정
X = combined_data.drop(columns=["PM_value"])  # 특성
y = combined_data["PM_value"]               # 타겟

groups = combined_data["region"]  # 그룹 기준: 지역



# Group K-Fold 교차검증
gkf = GroupKFold(n_splits=5)
params = {
    "objective": "reg:squarederror",
    "learning_rate": 0.1,
    "max_depth": 6,
    "subsample": 0.8,
    "colsample_bytree": 0.8
}

# 결과 저장
rmse_list = []

for train_index, test_index in gkf.split(X, y, groups):
    X_train, X_test = X.iloc[train_index], X.iloc[test_index]
    y_train, y_test = y.iloc[train_index], y.iloc[test_index]

    # DMatrix 생성
    dtrain = xgb.DMatrix(X_train, label=y_train)
    dtest = xgb.DMatrix(X_test, label=y_test)

    # 모델 학습
    model = xgb.train(params, dtrain, num_boost_round=200,
                      evals=[(dtest, 'eval')],
                      early_stopping_rounds=10,
                      verbose_eval=True)

    # 예측
    y_pred = model.predict(dtest)

    # RMSE 계산
    rmse = mean_squared_error(y_test, y_pred, squared=False)
    rmse_list.append(rmse)
    print(f"Fold RMSE: {rmse:.2f}")

# 평균 RMSE 출력
print(f"Average RMSE across folds: {sum(rmse_list) / len(rmse_list):.2f}")

# 특성 중요도 시각화
xgb.plot_importance(model, importance_type='gain')
plt.show()


In [None]:
'''
- 유사한 지역 끼리 그룹화 해서
- 클러스터링 활용하기
'''

from sklearn.cluster import KMeans


# Step 1: 지역별 통계값 계산
# 수정 필요
region_stats = combined_data.groupby('region')['PM_value'].agg(['mean', 'std']).reset_index()

# Step 2: 클러스터링 (K-Means)
# 수정 필요
n_clusters = 3  # 원하는 그룹 수 설정
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
region_stats['cluster'] = kmeans.fit_predict(region_stats[['mean', 'std']])

# Step 3: 각 데이터에 클러스터 정보 추가
region_to_cluster = region_stats.set_index('region')['cluster'].to_dict()
combined_data['cluster'] = combined_data['region'].map(region_to_cluster)

# Step 4: Group K-Fold로 교차검증
gkf = GroupKFold(n_splits=n_clusters)  # 클러스터 수와 동일하게 설정
groups = combined_data['cluster']  # 클러스터 기반 그룹

params = {
    "objective": "reg:squarederror",
    "learning_rate": 0.1,
    "max_depth": 6,
    "subsample": 0.8,
    "colsample_bytree": 0.8
}

rmse_list = []
X = combined_data.drop(columns=['PM_value', 'region', 'cluster'])
y = combined_data['PM_value']

for train_index, test_index in gkf.split(X, y, groups):
    X_train, X_test = X.iloc[train_index], X.iloc[test_index]
    y_train, y_test = y.iloc[train_index], y.iloc[test_index]

    dtrain = xgb.DMatrix(X_train, label=y_train)
    dtest = xgb.DMatrix(X_test, label=y_test)

    model = xgb.train(params, dtrain, num_boost_round=200,
                      evals=[(dtest, 'eval')],
                      early_stopping_rounds=10,
                      verbose_eval=True)

    y_pred = model.predict(dtest)
    rmse = mean_squared_error(y_test, y_pred, squared=False)
    rmse_list.append(rmse)
    print(f"Fold RMSE: {rmse:.2f}")

print(f"Average RMSE across folds: {sum(rmse_list) / len(rmse_list):.2f}")

##랜덤하게

In [None]:
# method1

# KFold 교차검증 설정
kf = KFold(n_splits=5, shuffle=True, random_state=42)  # shuffle=True로 랜덤하게 분할

# 결과 저장
rmse_list = []
params = {
    "objective": "reg:squarederror",
    "learning_rate": 0.1,
    "max_depth": 6,
    "subsample": 0.8,
    "colsample_bytree": 0.8
}

for train_index, test_index in kf.split(X):
    X_train, X_test = X.iloc[train_index], X.iloc[test_index]
    y_train, y_test = y.iloc[train_index], y.iloc[test_index]

    # DMatrix 생성
    dtrain = xgb.DMatrix(X_train, label=y_train)
    dtest = xgb.DMatrix(X_test, label=y_test)

    # 모델 학습
    model = xgb.train(params, dtrain, num_boost_round=200,
                      evals=[(dtest, 'eval')],
                      early_stopping_rounds=10,
                      verbose_eval=True)

    # 예측 및 평가
    y_pred = model.predict(dtest)
    rmse = mean_squared_error(y_test, y_pred, squared=False)
    rmse_list.append(rmse)
    print(f"Fold RMSE: {rmse:.2f}")

print(f"Average RMSE across folds: {sum(rmse_list) / len(rmse_list):.2f}")

In [None]:
# method2

from sklearn.model_selection import ShuffleSplit

# ShuffleSplit 설정
ss = ShuffleSplit(n_splits=5, test_size=0.2, random_state=42)

# 결과 저장
rmse_list = []

for train_index, test_index in ss.split(X):
    X_train, X_test = X.iloc[train_index], X.iloc[test_index]
    y_train, y_test = y.iloc[train_index], y.iloc[test_index]

    # DMatrix 생성
    dtrain = xgb.DMatrix(X_train, label=y_train)
    dtest = xgb.DMatrix(X_test, label=y_test)

    # 모델 학습
    model = xgb.train(params, dtrain, num_boost_round=200,
                      evals=[(dtest, 'eval')],
                      early_stopping_rounds=10,
                      verbose_eval=True)

    # 예측 및 평가
    y_pred = model.predict(dtest)
    rmse = mean_squared_error(y_test, y_pred, squared=False)
    rmse_list.append(rmse)
    print(f"Fold RMSE: {rmse:.2f}")

print(f"Average RMSE across splits: {sum(rmse_list) / len(rmse_list):.2f}")

##연도 별로 나눠서

In [None]:
years = combined_data['year'].unique()  # 고유한 연도 추출

# 결과 저장
rmse_list = []
params = {
    "objective": "reg:squarederror",
    "learning_rate": 0.1,
    "max_depth": 6,
    "subsample": 0.8,
    "colsample_bytree": 0.8
}

# 연도별 Custom Split
for year in years:
    # 특정 연도를 테스트 데이터로 설정
    train_data = combined_data[combined_data['year'] != year]
    test_data = combined_data[combined_data['year'] == year]

    X_train, y_train = train_data.drop(columns=['PM_value', 'region', 'cluster']), train_data['PM_value']
    X_test, y_test = test_data.drop(columns=['PM_value', 'region', 'cluster']), test_data['PM_value']

    # DMatrix 생성
    dtrain = xgb.DMatrix(X_train, label=y_train)
    dtest = xgb.DMatrix(X_test, label=y_test)

    # 모델 학습
    model = xgb.train(params, dtrain, num_boost_round=200,
                      evals=[(dtest, 'eval')],
                      early_stopping_rounds=10,
                      verbose_eval=True)

    # 예측 및 평가
    y_pred = model.predict(dtest)
    rmse = mean_squared_error(y_test, y_pred, squared=False)
    rmse_list.append(rmse)
    print(f"Year {year} as Test Set - RMSE: {rmse:.2f}")

# 평균 RMSE 출력
print(f"Average RMSE across years: {sum(rmse_list) / len(rmse_list):.2f}")