In [1]:
import numpy as np
import pandas as pd

df = pd.read_csv("data/merged.csv") 

In [2]:
environment = {
        "PFBS_NTRO_CBDX_CTRN": "관측지점실내이산화탄소농도",
        "ABSLT_HMDT": "절대습도",
        "INNER_HMDT_1": "내부습도_1",
        "INNER_HMDT_2": "내부습도_2",
        "AVE_INNER_HMDT_1_2": "평균내부습도_1_2",
        "WNDRC": "풍향",
        "WDSP": "풍속",
        "STRTN_WATER": "포화수분",
        "EXTN_SRQT": "외부일사량",
        "WATER_LACK_VL": "수분부족량",
        "EXTN_ACCMLT_QOFLG": "외부누적광량",
        "PRCPT_YN": "강수여부",
        "DWP_TPRT": "이슬점온도",
        "EXTN_TPRT": "외부온도",
        "INNER_TPRT_1": "내부온도_1",
        "INNER_TPRT_2": "내부온도_2",
        "AVE_INNER_TPRT_1_2": "평균내부온도_1_2",
}

df["SUB_MHRLS_OPRT_YN"] = df[["SUB_MHRLS_OPRT_YN_1","SUB_MHRLS_OPRT_YN_2", "SUB_MHRLS_OPRT_YN_3"]].apply(
    lambda x: 0 if 0 in x.values else 1, axis=1)

In [3]:
print(df["SUB_MHRLS_OPRT_YN"].value_counts())
var = df["SUB_MHRLS_OPRT_YN"].var()
for k in environment.keys():
    covariance = df["SUB_MHRLS_OPRT_YN"].cov(df[k])
    print(f"{environment[k]}: \t\t Cov is {covariance:.4f} \t Beta is {covariance/var:.4f}")

SUB_MHRLS_OPRT_YN
0    28940
1    19444
Name: count, dtype: int64
관측지점실내이산화탄소농도: 		 Cov is -10.7342 	 Beta is -44.6559
절대습도: 		 Cov is 0.5911 	 Beta is 2.4592
내부습도_1: 		 Cov is -0.5594 	 Beta is -2.3271
내부습도_2: 		 Cov is -0.2534 	 Beta is -1.0540
평균내부습도_1_2: 		 Cov is -0.4064 	 Beta is -1.6905
풍향: 		 Cov is -1.3323 	 Beta is -5.5427
풍속: 		 Cov is 0.0189 	 Beta is 0.0787
포화수분: 		 Cov is 0.9000 	 Beta is 3.7442
외부일사량: 		 Cov is 21.0940 	 Beta is 87.7546
수분부족량: 		 Cov is 0.3086 	 Beta is 1.2840
외부누적광량: 		 Cov is 65.9143 	 Beta is 274.2143
강수여부: 		 Cov is 0.0142 	 Beta is 0.0593
이슬점온도: 		 Cov is 0.6378 	 Beta is 2.6535
외부온도: 		 Cov is 2.0061 	 Beta is 8.3456
내부온도_1: 		 Cov is 0.7847 	 Beta is 3.2645
내부온도_2: 		 Cov is 0.7182 	 Beta is 2.9877
평균내부온도_1_2: 		 Cov is 0.7514 	 Beta is 3.1261


In [5]:
from sklearn.model_selection import train_test_split

x_cols = environment.keys()
y_cols = ["SUB_MHRLS_OPRT_YN"]

X = df[x_cols].values
y = df[y_cols].values.ravel()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [10]:
df[y_cols].describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
SUB_MHRLS_OPRT_YN,48384.0,0.401868,0.490281,0.0,0.0,0.0,1.0,1.0


using scikit-learn

In [6]:
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score

# SVM 모델 정의 및 학습
model = SVC(kernel='rbf', C=1, gamma='auto')
model.fit(X_train, y_train)

# 테스트 및 평가
y_pred = model.predict(X_train)
train_accuracy = accuracy_score(y_train, y_pred)
y_pred = model.predict(X_test)
test_accuracy = accuracy_score(y_test, y_pred)
print(f'SVC Accuracy: train {train_accuracy} test {test_accuracy}')

# KNN 모델 정의 및 학습
model = KNeighborsClassifier(n_neighbors=2)
model.fit(X_train, y_train)

# 테스트 및 평가
y_pred = model.predict(X_train)
train_accuracy = accuracy_score(y_train, y_pred)
y_pred = model.predict(X_test)
test_accuracy = accuracy_score(y_test, y_pred)
print(f'KNeighborsClassifier Accuracy: train {train_accuracy} test {test_accuracy}')



SVC Accuracy: train 0.9960730617201023 test 0.7142709517412421
KNeighborsClassifier Accuracy: train 0.9523858733562405 test 0.8923220006200269


LSTM: 성능이 저 둘의 중간일뿐.

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# LSTM 모델 정의
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])  # LSTM의 마지막 시퀀스만 사용
        return out

In [7]:
# 시계열 데이터를 시퀀스로 변환
seq_length = 1  # 시퀀스 길이
X_sequences = []
for i in range(len(X) - seq_length + 1):
    X_sequences.append(X[i:i+seq_length])

X_sequences = np.array(X_sequences)
y = y[seq_length - 1:]  # 레이블을 시퀀스에 맞게 조정


# 데이터를 PyTorch Tensor로 변환
X_tensor = torch.tensor(X_sequences, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.float32)

# 데이터셋 및 데이터로더 생성
dataset = TensorDataset(X_tensor, y_tensor)
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)

In [12]:
# 모델, 손실 함수 및 옵티마이저 설정
input_size = X_sequences.shape[2]  # 입력 크기: 특성 수
hidden_size = 200
num_layers = 1
output_size = 1  # 출력 크기: 이진 분류이므로 1
model = LSTMModel(input_size, hidden_size, num_layers, output_size)
criterion = nn.BCEWithLogitsLoss()  # 이진 분류용 손실 함수
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# 학습
num_epochs = 100
for epoch in range(1, num_epochs+1):
    for inputs, targets in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets.unsqueeze(1))  # 레이블을 2D로 변환하여 손실 함수에 전달
        loss.backward()
        optimizer.step()
    if epoch % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')


Epoch [11/100], Loss: 0.3298
Epoch [21/100], Loss: 0.7945
Epoch [31/100], Loss: 0.5782
Epoch [41/100], Loss: 0.1897
Epoch [51/100], Loss: 0.3416
Epoch [61/100], Loss: 0.2379
Epoch [71/100], Loss: 0.4609
Epoch [81/100], Loss: 0.2952
Epoch [91/100], Loss: 0.2494
Epoch [101/100], Loss: 0.1468


In [13]:
# 평가
from sklearn.metrics import accuracy_score, classification_report
with torch.no_grad():
    outputs = model(X_tensor)
    predicted = (torch.sigmoid(outputs) > 0.5).float()  # 확률을 기준으로 0 또는 1로 변환
    accuracy = accuracy_score(y_tensor, predicted)
    print(f'Train Accuracy: {accuracy:.4f}')

Test Accuracy: 0.8768


LSTM with Longer sequence: 1->10 

In [8]:
# 시계열 데이터를 시퀀스로 변환
seq_length = 10  # 시퀀스 길이
X_sequences = []
for i in range(len(X) - seq_length + 1):
    X_sequences.append(X[i:i+seq_length])

X_sequences = np.array(X_sequences)
y = y[seq_length - 1:]  # 레이블을 시퀀스에 맞게 조정


# 데이터를 PyTorch Tensor로 변환
X_tensor = torch.tensor(X_sequences, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.float32)

# 데이터셋 및 데이터로더 생성
dataset = TensorDataset(X_tensor, y_tensor)
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)

In [22]:
# 모델, 손실 함수 및 옵티마이저 설정
input_size = X_sequences.shape[2]  # 입력 크기: 특성 수
hidden_size = 200
num_layers = 1
output_size = 1  # 출력 크기: 이진 분류이므로 1
model = LSTMModel(input_size, hidden_size, num_layers, output_size)
criterion = nn.BCEWithLogitsLoss()  # 이진 분류용 손실 함수
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# 학습
num_epochs = 100
for epoch in range(1, num_epochs+1):
    for inputs, targets in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets.unsqueeze(1))  # 레이블을 2D로 변환하여 손실 함수에 전달
        loss.backward()
        optimizer.step()
    if epoch % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')


Epoch [11/100], Loss: 0.3782
Epoch [21/100], Loss: 0.6070
Epoch [31/100], Loss: 0.2058
Epoch [41/100], Loss: 0.2446
Epoch [51/100], Loss: 0.2536
Epoch [61/100], Loss: 0.3810
Epoch [71/100], Loss: 0.2049
Epoch [81/100], Loss: 0.4170
Epoch [91/100], Loss: 0.1060
Epoch [101/100], Loss: 0.2612


In [23]:
# 평가
from sklearn.metrics import accuracy_score, classification_report
with torch.no_grad():
    outputs = model(X_tensor)
    predicted = (torch.sigmoid(outputs) > 0.5).float()  # 확률을 기준으로 0 또는 1로 변환
    accuracy = accuracy_score(y_tensor, predicted)
    print(f'Train Accuracy: {accuracy:.4f}')

Train Accuracy: 0.9122


강화학습

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random

# Q-network 구성
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size, hidden_size):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, action_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Offline Q-learning 알고리즘
def offline_q_learning(states, actions, rewards, next_states, dones, q_network, gamma):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(q_network.parameters(), lr=0.001)
    
    for i in range(len(states)):
        state = states[i]
        action = actions[i]
        reward = rewards[i]
        next_state = next_states[i]
        done = dones[i]
        
        # 목표 Q-value 계산
        with torch.no_grad():
            if done:
                target = reward
            else:
                target = reward + gamma * torch.max(q_network(torch.tensor(next_state, dtype=torch.float32)))
        
        # 현재 Q-value 계산
        current = q_network(torch.tensor(state, dtype=torch.float32))[action]
        
        # 손실 계산 및 역전파
        loss = criterion(current, target)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()



In [11]:
for i, j in train_loader:
    print(i.shape, j.shape)
    break

torch.Size([32, 10, 17]) torch.Size([32])


In [None]:
# 미리 수집한 샘플 데이터
states = [...] # 상태 데이터
actions = [...] # 행동 데이터
rewards = [...] # 보상 데이터
next_states = [...] # 다음 상태 데이터
dones = [...] # 종료 여부 데이터

# 환경 설정
state_size = len(states[0])
action_size = len(actions[0])

# 하이퍼파라미터 설정
hidden_size = 64
gamma = 0.99

# Q-network 초기화 및 학습
q_network = QNetwork(state_size, action_size, hidden_size)
offline_q_learning(states, actions, rewards, next_states, dones, q_network, gamma)


In [15]:
import numpy as np
from sklearn.svm import SVC

# 3차원 시계열 데이터 생성
# 각 데이터 포인트는 (seq_length, num_features) 형태의 2차원 배열로 가정
def generate_time_series_3d(num_samples, seq_length, num_features):
    X = np.random.randn(num_samples, seq_length, num_features)
    y = np.random.randint(2, size=num_samples)
    return X, y

# 데이터 생성
num_samples = 1000
seq_length = 50
num_features = 3  # 각 시간 단계에서의 특징 수
X, y = generate_time_series_3d(num_samples, seq_length, num_features)
print(X.shape, y.shape)

# 3차원 데이터를 2차원으로 변환
X_2d = X.reshape(num_samples, -1)
print(X[0].shape, X_2d[0].shape)

# SVM Classifier 초기화 및 훈련
svm_clf = SVC(kernel='linear')
svm_clf.fit(X_2d, y)

# 새로운 데이터에 대한 예측
new_data = np.random.randn(10, seq_length, num_features)  # 예시를 위해 임의의 새로운 데이터 생성
new_data_2d = new_data.reshape(10, -1)
predictions = svm_clf.predict(new_data_2d)
print("Predictions:", predictions)


(1000, 50, 3) (1000,)
(50, 3) (150,)
Predictions: [1 1 1 1 0 0 0 0 1 0]
