### Simple lightGBM example

In [8]:
import pandas as pd
import lightgbm as lgb
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

data = {
    'timeStep': [
        '2023-08-14 20:30:00', '2023-08-14 20:45:00', '2023-08-14 21:00:00',
        '2023-08-14 22:30:00', '2023-08-14 22:45:00',
        '2023-08-14 23:15:00', '2023-08-14 23:45:00'
    ],
    'target': [
        34.0, 36.0, 32.0, 30.0, 34.0, 32.0, 36.0
    ]
}

print({len(data['timeStep'])}, {len(data['target'])})
data_df = pd.DataFrame(data)
data_df['timeStep'] = pd.to_datetime(data_df['timeStep'])
data_df.set_index('timeStep', inplace=True)

import plotly.graph_objects as go

fig = go.Figure()
fig.add_trace(go.Scatter(x=data_df.index, y=data_df['target'], mode='lines+markers', name='Data', line=dict(color='orange')))

fig.update_layout(title='Data',
                  xaxis_title='TimeStep',
                  yaxis_title='Target',
                  legend=dict(x=0, y=1))
fig.show()

# 특성(feature) 생성 - 과거 값들을 특성으로 사용
data_df['target_lag1'] = data_df['target'].shift(1)
data_df['target_lag2'] = data_df['target'].shift(2)
data_df['target_lag3'] = data_df['target'].shift(3)

# 결측치 제거 (처음 몇 개의 lag 값 때문에 생기는 NaN 제거)
data_df = data_df.dropna()

# 입력 변수(X)와 출력 변수(y) 정의
X = data_df[['target_lag1', 'target_lag2', 'target_lag3']]
y = data_df['target']

# 학습/검증 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=False)

# LightGBM 데이터셋으로 변환
train_data = lgb.Dataset(X_train, label=y_train)
valid_data = lgb.Dataset(X_test, label=y_test, reference=train_data)

# 모델 파라미터 설정
params = {
    'objective': 'regression',
    'metric': 'rmse',
    'boosting_type': 'gbdt',
    'learning_rate': 0.05,
    'num_leaves': 31
}

# 모델 학습
num_round = 100
bst = lgb.train(params, train_data, num_round, valid_sets=[valid_data])

# 예측 수행
y_pred = bst.predict(X_test, num_iteration=bst.best_iteration)

# 예측 결과 평가
rmse = mean_squared_error(y_test, y_pred, squared=False)
print(f'RMSE: {rmse}')

# 예측 결과 출력
predicted_df = X_test.copy()
predicted_df['actual'] = y_test
predicted_df['predicted'] = y_pred
print(predicted_df)

import plotly.graph_objects as go

fig = go.Figure()
fig.add_trace(go.Scatter(x=predicted_df.index, y=predicted_df['actual'], mode='markers', name='Actual'))
fig.add_trace(go.Scatter(x=predicted_df.index, y=predicted_df['predicted'], mode='lines+markers', name='Predicted', line=dict(dash='dash')))
fig.add_trace(go.Scatter(x=data_df.index, y=data_df['target'], mode='lines+markers', name='Data', line=dict(color='orange')))

fig.update_layout(title='Actual vs Predicted',
                  xaxis_title='TimeStep',
                  yaxis_title='Target',
                  legend=dict(x=0, y=1))
fig.show()

{7} {7}


[LightGBM] [Info] Total Bins 0
[LightGBM] [Info] Number of data points in the train set: 3, number of used features: 0
[LightGBM] [Info] Start training from score 32.000000
RMSE: 4.0
                     target_lag1  target_lag2  target_lag3  actual  predicted
timeStep                                                                     
2023-08-14 23:45:00         32.0         34.0         30.0    36.0       32.0



'squared' is deprecated in version 1.4 and will be removed in 1.6. To calculate the root mean squared error, use the function'root_mean_squared_error'.



### lightGBM을 이용한 시계열 예측 모듈 (feature extraction 강화, feature reduction 기법)

In [None]:
# lightGBM을 이용한 시계열 예측 모듈
import numpy as np
import pandas as pd
import holidays
from lightgbm import LGBMRegressor
from sklearn.model_selection import train_test_split
from pandas.tseries.offsets import CustomBusinessDay
import datetime
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
from datetime import timedelta

# 예제 데이터프레임 생성
#df = pd.read_csv('timeseries_data.csv', parse_dates=['date'])
date_rng = pd.date_range(start='2020-01-01', end='2022-12-31', freq='D')
df = pd.DataFrame(date_rng, columns=['date'])
df['target'] = np.random.randint(100, 1000, size=(len(date_rng)))
df.set_index('date', inplace=True)

# 시간 기반 피처 생성 (대한민국의 주말 및 공휴일 반영)
kr_holidays = holidays.KR()
kr_business_day = CustomBusinessDay(holidays=kr_holidays)

df['year'] = df.index.year
df['month'] = df.index.month
df['day'] = df.index.day
df['weekday'] = df.index.weekday
#df['is_weekend'] = df['weekday'].isin([5, 6]).astype(int)  # 주말 여부
#df['is_holiday'] = df.index.isin(kr_holidays).astype(int)  # 공휴일 여부

# 시간 지연 피처 생성
df['lag_1'] = df['target'].shift(1)
df['lag_2'] = df['target'].shift(2)
df['lag_7'] = df['target'].shift(7)
df['lag_30'] = df['target'].shift(30)

# 이동 평균 피처 생성
##df['rolling_mean_1'] = df['target'].rolling(window=1).mean()
##f['rolling_mean_2'] = df['target'].rolling(window=2).mean()
df['ma_7'] = df['target'].rolling(window=7).mean()
df['ma_30'] = df['target'].rolling(window=30).mean()

# 추세 및 계절성 분해 (간단한 방법)
df['trend'] = np.arange(len(df))
df['season'] = df['target'] - df['ma_30']

# 변동률 피처 생성
df['rate_of_change'] = (df['target'] - df['target'].shift(1)) / df['target'].shift(1)

# 계절성 피처 생성
df['sin_month'] = np.sin(2 * np.pi * df['month'] / 12)
df['cos_month'] = np.cos(2 * np.pi * df['month'] / 12)

# 주기성 특성 (Cyclical Features)
df['sine_day'] = np.sin(2 * np.pi * df.index.dayofyear / 365.25)
df['cosine_day'] = np.cos(2 * np.pi * df.index.dayofyear / 365.25)

# 윈도우 통계 (Window Statistics)
df['target_max_7'] = df['target'].rolling(window=7).max()  # 7일간의 최대값
df['target_min_7'] = df['target'].rolling(window=7).min()  # 7일간의 최소값
df['target_std_7'] = df['target'].rolling(window=7).std()  # 7일간의 표준편차

# 연휴 전후 여부 특성 추가 (설날 및 추석 연휴 포함)
major_holidays = ['설날', '추석']
for date in df.index:
    if date in kr_holidays:
        holiday_name = kr_holidays[date]
        if holiday_name in major_holidays:
            df.loc[df.index == date - datetime.timedelta(days=1), 'is_holiday'] = 1
            df.loc[df.index == date + datetime.timedelta(days=1), 'is_holiday'] = 1

# kr_holidays를 datetime으로 변환
holiday_dates = pd.to_datetime(list(kr_holidays.keys()))
#df['is_before_holiday'] = df.index.shift(-1, freq='D').isin(holiday_dates).astype(int)  # 연휴 전날 여부
#df['is_after_holiday'] = df.index.shift(1, freq='D').isin(holiday_dates).astype(int)  # 연휴 다음 날 여부

# 특정 명절 및 중요 휴일 특성 추가
major_holidays.append('어린이날')
df['is_major_holiday'] = df.index.to_series().apply(lambda x: 1 if x in kr_holidays and kr_holidays[x] in major_holidays else 0)

# 대체공휴일 반영
replacement_holidays = []
for date in kr_holidays:
    if date.weekday() in [5, 6]:  # 토요일(5)이나 일요일(6)에 해당하는 공휴일일 경우
        replacement_date = date + datetime.timedelta(days=1)
        while replacement_date.weekday() in [5, 6] or replacement_date in kr_holidays:
            replacement_date += datetime.timedelta(days=1)
        replacement_holidays.append(replacement_date)
df['is_replacement_holiday'] = df.index.isin(pd.to_datetime(replacement_holidays)).astype(int)

# 공휴일 전후 주중/주말 여부 추가
#df['is_holiday_weekend'] = ((df['is_holiday'] == 1) & df['is_weekend'] == 1).astype(int)  # 공휴일이 주말인 경우
#df['is_holiday_weekday'] = ((df['is_holiday'] == 1) & df['is_weekend'] == 0).astype(int)  # 공휴일이 주중인 경우
#df['is_month_end'] = df.index.is_month_end.astype(int)  # 월별 마지막 날 여부
#df['is_month_start'] = df.index.is_month_start.astype(int)  # 월초 여부

# 외부 변수 추가 (예: 기온 데이터)
#external_data = pd.read_csv('external_data.csv')
#df = df.merge(external_data, on='date', how='left')
#df['temperature'] = df['temperature']

# 결측값 처리 (특징 벡터 생성 후 생길 수 있는 NaN 처리)
#df.fillna(method='bfill', inplace=True)
#df.fillna(method='ffill', inplace=True)
df.bfill(inplace=True)
df.ffill(inplace=True)

# 특성과 타겟 분리
X = df.drop('target', axis=1)
y = df['target']

# 학습 및 테스트 데이터 분리
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

# 모델 학습
model = LGBMRegressor()
model.fit(X_train, y_train)

# 예측
y_pred = model.predict(X_test)


# 특성 중요도 계산 및 시각화 추가
import matplotlib.pyplot as plt  # 시각화를 위한 matplotlib 임포트

# 모델의 특성 중요도 추출
feature_importances = model.feature_importances_
feature_names = X.columns

# 특성 중요도를 데이터프레임으로 정리
importance_df = pd.DataFrame({'feature': feature_names, 'importance': feature_importances})
importance_df = importance_df.sort_values(by='importance', ascending=False)

# 특성 중요도 시각화
plt.figure(figsize=(10, 8))  # 그래프 크기 설정
plt.barh(importance_df['feature'], importance_df['importance'])  # 수평 막대그래프 생성
plt.xlabel('Feature Importance')  # x축 레이블 설정
plt.title('Feature Importance Visualization')  # 그래프 제목 설정
plt.gca().invert_yaxis()  # 중요도가 높은 순으로 표시
plt.show()  # 그래프 출력

In [None]:
from sklearn.metrics import mean_squared_error  # MSE calculation import

# Define functions for Feature Reduction
def select_top_n_features(feature_importance, n):
    return feature_importance.head(n)['feature'].tolist()

def select_cumulative_importance(feature_importance, threshold):
    total_importance = feature_importance['importance'].sum()
    feature_importance = feature_importance.copy()
    feature_importance['normalized_importance'] = feature_importance['importance'] / total_importance
    feature_importance['cumulative_importance'] = feature_importance['normalized_importance'].cumsum()
    return feature_importance[feature_importance['cumulative_importance'] <= threshold]['feature'].tolist()

def select_importance_threshold(feature_importance, threshold):
    return feature_importance[feature_importance['importance'] >= threshold]['feature'].tolist()

# Apply each method
top_10_features = select_top_n_features(importance_df, 10)  # Select top 10 features
cumulative_80_features = select_cumulative_importance(importance_df, 0.8)  # Select features with cumulative importance ≤ 80%
importance_threshold_features = select_importance_threshold(importance_df, 0.01)  # Select features with importance ≥ 0.01

# Create new datasets using selected features
X_top_10 = X[top_10_features]
X_cumulative_80 = X[cumulative_80_features]
X_importance_threshold = X[importance_threshold_features]

# Define function to train and evaluate models with reduced feature sets
def train_and_evaluate(X, y, feature_set_name):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
    model = LGBMRegressor(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)
    print(f"MSE for {feature_set_name}: {mse}")
    return model, mse

# Train and evaluate models with original and reduced feature sets
original_model, original_mse = train_and_evaluate(X, y, "Original Feature Set")
top_10_model, top_10_mse = train_and_evaluate(X_top_10, y, "Top 10 Features")
cumulative_80_model, cumulative_80_mse = train_and_evaluate(X_cumulative_80, y, "Cumulative 80% Importance Features")
importance_threshold_model, importance_threshold_mse = train_and_evaluate(X_importance_threshold, y, "Features with Importance >= 0.01")

# Compare results
results = pd.DataFrame({
    'Feature Set': ['Original', 'Top 10', 'Cumulative 80%', 'Importance >= 0.01'],
    'Number of Features': [X.shape[1], len(top_10_features), len(cumulative_80_features), len(importance_threshold_features)],
    'MSE': [original_mse, top_10_mse, cumulative_80_mse, importance_threshold_mse]
})

print(results)

# Visualize the distribution of selected feature sets
plt.figure(figsize=(12, 6))
plt.bar(results['Feature Set'], results['Number of Features'])
plt.title('Number of Features in Each Set')  # Visualize the number of features for each set
plt.ylabel('Number of Features')  # Set y-axis label
plt.show()

# Visualize MSE comparison
plt.figure(figsize=(12, 6))
plt.bar(results['Feature Set'], results['MSE'])
plt.title('Mean Squared Error (MSE) for Each Feature Set')  # Visualize the MSE for each set
plt.ylabel('MSE')  # Set y-axis label
plt.show()

# Print final selected feature sets
print("\nTop 10 Features:")
print(top_10_features)
print("\nCumulative 80% Importance Features:")
print(cumulative_80_features)
print("\nFeatures with Importance >= 0.01:")
print(importance_threshold_features)


In [None]:
import numpy as np
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import datetime
import random
import matplotlib.pyplot as plt

# 1. 센서 데이터 생성
np.random.seed(42)
random.seed(42)
num_samples = 10000

dates = [datetime.datetime.now() - datetime.timedelta(minutes=5*i) for i in range(num_samples)]
# 싸인파와 노이즈의 합으로 온도, 습도, 조도 값을 생성
time_indices = np.arange(num_samples)

temperature = 30 + 5 * np.sin(2 * np.pi * time_indices / 1440) + np.random.normal(0, 1, num_samples)  # 평균 30도, 하루 주기의 싸인파 + 노이즈
humidity = 65 + 10 * np.sin(2 * np.pi * time_indices / 1440 + np.pi / 4) + np.random.normal(0, 2, num_samples)  # 평균 65%, 하루 주기의 싸인파 + 노이즈
illumination = 750 + 200 * np.sin(2 * np.pi * time_indices / 1440 + np.pi / 2) + np.random.normal(0, 50, num_samples)  # 평균 750 lx, 하루 주기의 싸인파 + 노이즈

# 데이터프레임으로 생성
data = pd.DataFrame({
    'datetime': dates,
    'temperature': temperature,
    'humidity': humidity,
    'illumination': illumination
})

# 2. Feature Engineering (시간특성 반영)
data['hour'] = data['datetime'].dt.hour
data['day_of_week'] = data['datetime'].dt.dayofweek
data['minute'] = data['datetime'].dt.minute
data.drop('datetime', axis=1, inplace=True)

# 3. LightGBM을 활용하여 예측 모델 만들기
# 예측하고자 하는 목표 변수 (타겟): 미래의 온도 (다음 5분 온도 예측)
data['future_temperature'] = data['temperature'].shift(-1)
data.dropna(inplace=True)

# Feature / Target 분리
features = ['temperature', 'humidity', 'illumination', 'hour', 'day_of_week', 'minute']
target = 'future_temperature'

X = data[features]
y = data[target]

# 데이터 분리 (학습용과 테스트용)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 모델 생성 및 학습
train_data = lgb.Dataset(X_train, label=y_train)
test_data = lgb.Dataset(X_test, label=y_test, reference=train_data)

params = {
    'objective': 'regression',
    'metric': 'rmse',
    'boosting_type': 'gbdt',
    'learning_rate': 0.05,
    'num_leaves': 31
}

model = lgb.train(params,
                  train_data,
                  valid_sets=[test_data],
                  num_boost_round=1000)

# 4. 모델 성능 평가
predictions = model.predict(X_test)
mse = mean_squared_error(y_test, predictions)
rmse = np.sqrt(mse)
print(f'RMSE: {rmse:.2f}')

# 예측 결과 시각화 (랜덤 샘플링)
random_indices = np.random.choice(len(y_test), 100, replace=False)
y_test_sample = y_test.iloc[random_indices]
predictions_sample = predictions[random_indices]

plt.figure(figsize=(14, 6))
plt.plot(y_test_sample.values, label='Actual Temperature', color='blue', marker='o')
plt.plot(predictions_sample, label='Predicted Temperature', color='red', marker='x')
plt.xlabel('Sample Index')
plt.ylabel('Temperature (°C)')
plt.title('Actual vs Predicted Temperature (Random 100 Samples)')
plt.legend()
plt.show()
