## 강화학습 모델 생성

In [3]:
from scipy.io import loadmat
import numpy as np
np.set_printoptions(formatter={'float': '{:0.5f}'.format})

import pandas as pd

# .mat 파일 로드
data = loadmat('pvLoadPriceData_test.mat')

# 데이터 추출 예시
# MATLAB 파일 내에 있는 변수 이름을 정확히 알아야 합니다. time, cloudyDay, clearDay, loadData, costData
time = data['time'].flatten()
solarData = data['clearDay'].flatten() * 10e3 
loadData = data['loadData'][:, 2].flatten() 
costData = data['costData'].flatten() 

print("자료형:", type(time), "차원" , time.shape, )
print("자료형:", type(solarData), "차원" , solarData.shape, )
print("자료형:", type(loadData), "차원" , loadData.shape, )
print("자료형:", type(costData), "차원" , costData.shape, )

solarData = solarData.tolist()
loadData = loadData.tolist()
costData = costData.tolist()

자료형: <class 'numpy.ndarray'> 차원 (228,)
자료형: <class 'numpy.ndarray'> 차원 (228,)
자료형: <class 'numpy.ndarray'> 차원 (228,)
자료형: <class 'numpy.ndarray'> 차원 (228,)


In [48]:
import numpy as np
import gym
from gym import spaces

# 전력망 환경 클래스 정의
class PowerGridEnv(gym.Env):
    def __init__(self):
        self.solar_data = solarData
        self.load_data = loadData
        self.price_data = costData

        BattCap = 2500  # 에너지 저장 장치 용량 [kWh]
        self.battEnergy = 3.6e3 * BattCap # 총 배터리 용량
        self.battery = ((0.5 * self.battEnergy)/self.battEnergy) * 100  # 초기 배터리 상태
        self.batteryMin = ((0.2 * self.battEnergy)/self.battEnergy) * 100
        self.batteryMax = ((0.8 * self.battEnergy)/self.battEnergy) * 100
        self.total_grid_cost = 0
        self.state = None

        # 배터리 상태
        self.observation_space = spaces.Box(low=np.array([0]), high=np.array([100]))
        
        # 액션 종류
        self.action_space = spaces.Discrete(7)
        self.data_pointer = 0
        

    def reset(self):
        self.data_pointer = 0
        self.battery = ((0.5 * self.battEnergy)/self.battEnergy) * 100
        self.total_grid_cost = 0
        self.state = [self.solar_data[0], self.load_data[0], self.price_data[0], self.battery]
        
        return np.array(self.state)
    
    def render(self, mode='human'):
        # 가정: self.state는 환경의 현재 상태를 나타냅니다.
        # 간단한 텍스트 출력으로 상태를 표현합니다.
        print(f"{self.state}")
        
    def step(self, action):
        reward = 0
       
        tPV = self.solar_data[self.data_pointer]    # 스탭 당 태양광 발전량
        tLoad = self.load_data[self.data_pointer]   # 스탭 당 부하량
        tPrice = self.price_data[self.data_pointer] # 스탭 당 전력 가격

        tP2B = 0
        tB2P = 0
        tNeedPower = 0

        # 액션에 따른 배터리 및 그리드 상호작용
        if action == 0 : # 태양광 O ESS X 그리드 X
            if tPV >= tLoad :
                tSurplusPower = tPV - tLoad # 잉여 전력 계산 -> 잉여 전력이 있다면 배터리에 저장
                if tSurplusPower > 0 : 
                    # 잉여 전력량
                    tP2B = (tSurplusPower / self.battEnergy) * 100

                    # 배터리의 충전 가능(%)
                    tPossible_Batt = self.batteryMax - self.battery

                    # 배터리에 충전할 전력 계산
                    tPowerChg = min(tP2B, tPossible_Batt) # 잉여 전력이랑 배터리 충전 가능량을 비교해서 더 작은 값을 충전
                    tSurplusChg = tP2B - tPowerChg # 충전하고 남은 전력량
                    
                    # 배터리를 충전했을 때 20~80% 일 때
                    if ((self.battery + tPowerChg) > self.batteryMin) and ((self.battery + tPowerChg) < self.batteryMax) :
                        self.battery += tPowerChg
                    # 배터리를 충전했을 때 80% 이상일 때
                    else :
                        self.total_grid_cost -= tP2B * tPrice

                
            else : # tPV < tLoad
                tNeedPower = tLoad - tPV # 필요 전력 계산 -> 필요 전력 그리드에서 구매
                tB2P = (tNeedPower/self.battEnergy)*100

                # 배터리의 현재 상태
                tPossible_Batt = self.battery - self.batteryMin

                # 사용 가능량
                tPowerDisChg = min(tB2P, tPossible_Batt)
                tSurplusDisChg = tB2P - tPowerDisChg # 방전하고 남은 전력량

                if (self.battery - tPowerDisChg >= self.batteryMin): # 방전했을 때 전력이 20% 이상라면
                        self.battery -= tPowerDisChg
                        self.total_grid_cost += tSurplusDisChg * tPrice

                # 사용 가능한 공간이 필요 전력보다 크다면
                if tPowerDisChg >= tB2P :
                    self.battery -= tB2P
                # 사용 가능한 공간이 필요 전력보다 작다면
                elif tPowerDisChg < tP2B :
                    # 사용할 전력
                    self.battery -= tPowerDisChg

                    # 더 필요한 전력 = 필요 전력 - 충전 가능한 량
                    need_discharging = tB2P - tPowerDisChg
                    self.total_grid_cost += need_discharging * tPrice


        elif action == 1 : # 태양광 O ESS O 그리드 X
            if tLoad > tPV :
                # 태양광 발전량으로 커버한 뒤 필요한 전력을 계산
                tNeedPower = tLoad - tPV
                tB2P = (tNeedPower/self.battEnergy)*100

                # 배터리의 사용 가능량
                possible_discharge = self.battery - self.batteryMin

                # 사용 가능한 공간이 필요 전력보다 크다면
                if possible_discharge >= tB2P :
                    self.battery -= tB2P
                # 사용 가능한 공간이 필요 전력보다 작다면
                elif possible_discharge < tP2B :
                    # 사용할 전력
                    self.battery -= possible_discharge

                    # 더 필요한 전력 = 필요 전력 - 충전 가능한 량
                    need_discharging = tB2P - possible_discharge
                    self.total_grid_cost += need_discharging * tPrice


            else : # tLoad < tPV 
                tSurplusPower = tPV - tLoad # 잉여 전력 계산 -> 잉여 전력이 있다면 배터리에 저장
                if (tSurplusPower > 0) :
                    # 잉여 전력을 배터리에 충전
                    tP2B = (tSurplusPower / self.battEnergy) * 100
                    
                    # 충전 가능량 확인
                    charging_amount = min(tP2B, self.batteryMax - self.battery)
                    if (charging_amount > tP2B) :
                        surplus_charging = charging_amount - tP2B
                    else :
                        surplus_charging = tP2B - charging_amount
                    surplus_charging = (surplus_charging/100) * self.battEnergy

                    if ((self.battery + tP2B) > self.batteryMin) and ((self.battery + tP2B) < self.batteryMax) : # 현재 배터리 잔량이 20~80% 사이면 충전
                        # tP2B가 작으면 그대로 충전
                        # 충전 가능량이 작으면 충전 가능량만 충전하고 나머지는 그리드 판매
                        self.battery += charging_amount
                        self.total_grid_cost += surplus_charging * tPrice

    
        elif action == 2 : # 태양광 O ESS X 그리드 O
            if tLoad > tPV :
                tNeedPower = tLoad - tPV # 태양광 발전량으로 커버한 뒤 필요한 전력을 계산
                if tNeedPower > 0 : # 전력이 더 필요할 경우
                    self.total_grid_cost += (tNeedPower * tPrice)

            else : # tLoad < tPV
                tSurplusPower = tPV - tLoad # 잉여 전력 계산 -> 잉여 전력이 있다면 배터리에 저장
                if (tSurplusPower > 0) :
                    # 잉여 전력을 배터리에 충전
                    tP2B = (tSurplusPower / self.battEnergy) * 100
                    
                    # 충전 가능량 확인
                    charging_amount = min(tP2B, self.batteryMax - self.battery)
                    
                    if (charging_amount > tP2B) :
                        surplus_charging = charging_amount - tP2B
                    else :
                        surplus_charging = tP2B - charging_amount

                    surplus_charging = (surplus_charging/100) * self.battEnergy

                    if ((self.battery + tP2B) > self.batteryMin) and ((self.battery + tP2B) < self.batteryMax) : # 현재 배터리 잔량이 20~80% 사이면 충전
                        # tP2B가 작으면 그대로 충전
                        # 충전 가능량이 작으면 충전 가능량만 충전하고 나머지는 그리드 판매
                        self.battery += charging_amount
                        self.total_grid_cost += surplus_charging * tPrice        


        elif action == 3 : # 태양광 O ESS O 그리드 O
            if tLoad > tPV :
                # 태양광 발전량으로 커버한 뒤 필요한 전력을 계산
                tNeedPower = tLoad - tPV
                tB2P = (tNeedPower/self.battEnergy)*100

                # 배터리의 사용 가능량
                possible_discharge = self.battery - self.batteryMin

                if tNeedPower > 0 : # 전력이 더 필요할 경우 ESS를 사용
                    tB2P = (tNeedPower/self.battEnergy)*100 

                    # 사용 가능한 공간이 필요 전력보다 크다면
                    if possible_discharge >= tB2P :
                        self.battery -= tB2P
                    # 사용 가능한 공간이 필요 전력보다 작다면
                    elif possible_discharge < tB2P :
                        # 사용할 전력
                        self.battery -= possible_discharge
            
                        # 더 필요한 전력 = 필요 전력 - 사용 가능한 량
                        need_discharging = tB2P - possible_discharge
                        self.total_grid_cost += need_discharging * tPrice
                    
                
            else : # tLoad < tPV
                tSurplusPower = tPV - tLoad # 잉여 전력 계산 -> 잉여 전력이 있다면 배터리에 저장
                if (tSurplusPower > 0) :
                    # 잉여 전력을 배터리에 충전
                    tP2B = (tSurplusPower / self.battEnergy) * 100
                    
                    # 충전 가능량 확인
                    charging_amount = min(tP2B, self.batteryMax - self.battery)
                    
                    if (charging_amount > tP2B) :
                        surplus_charging = charging_amount - tP2B
                    else :
                        surplus_charging = tP2B - charging_amount

                    surplus_charging = (surplus_charging/100) * self.battEnergy

                    if ((self.battery + tP2B) > self.batteryMin) and ((self.battery + tP2B) < self.batteryMax) : # 현재 배터리 잔량이 20~80% 사이면 충전
                        # tP2B가 작으면 그대로 충전
                        # 충전 가능량이 작으면 충전 가능량만 충전하고 나머지는 그리드 판매
                        self.battery += charging_amount
                        self.total_grid_cost += surplus_charging * tPrice
                            

        elif action == 4 : # 태양광 X ESS O 그리드 X
            if ((self.batteryMax - self.battery) > self.batteryMin):
                needPower = (tLoad/self.battEnergy) * 100
                tB2P = -min(needPower, (self.batteryMax - self.battery))
                self.battery += tB2P


        elif action == 5 : # 태양광 X ESS X 그리드 O
            self.total_grid_cost += tLoad * tPrice


        elif action == 6 : # 태양광 X ESS O 그리드 O
            self.battery -= tLoad
            if self.battery < 0:
                self.total_grid_cost += tPrice * abs(self.battery)
                self.battery = 0


        # 데이터 포인터 갱신
        if self.data_pointer >= len(self.solar_data) - 1:
            done = True
        else:
            done = False
            self.data_pointer += 1

        reward = 1 if done else 0

        self.state = [tPV, tLoad, self.battery, tP2B, -tB2P, action, self.total_grid_cost* 10e-6]
        return np.array(self.state), reward, done, {"cost" : self.total_grid_cost}
    
    
    


# 환경 인스턴스 생성 및 초기화
env = PowerGridEnv()
state = env.reset()
done = False

print("       PV        Load      SOC(%)   충전량   방전량     액션   누적 비용")

while not done:
    action = env.action_space.sample()  # 무작위 액션
    state, reward, done, info = env.step(action)
    
    print("", state, "보상", reward)


       PV        Load      SOC(%)   충전량   방전량     액션   누적 비용
 [35000.00000 5215.40860 50.00000 0.00000 0.00000 5.00000 3.98979] 보상 0
 [35100.00000 13735.69150 50.23738 0.23738 0.00000 3.00000 3.98979] 보상 0
 [34900.00000 22835.42484 0.00000 0.00000 0.00000 6.00000 21.42046] 보상 0
 [34500.00000 42857.48440 0.00000 0.00000 0.00000 5.00000 54.20643] 보상 0
 [34500.00000 53821.93141 0.00000 0.00000 0.00000 5.00000 95.38021] 보상 0
 [34700.00000 44613.88665 0.00000 0.00000 0.00000 5.00000 129.50983] 보상 0
 [34700.00000 48866.83350 20.00000 0.00000 -0.15741 1.00000 129.52525] 보상 0
 [34900.00000 44613.88665 0.00000 0.00000 0.00000 6.00000 163.63958] 보상 0
 [34500.00000 48866.83350 0.00000 0.00000 0.00000 5.00000 201.02270] 보상 0
 [34500.00000 44613.88665 40.00000 0.00000 -0.11238 0.00000 201.05348] 보상 0
 [34700.00000 48866.83350 40.00000 0.00000 0.00000 5.00000 238.43660] 보상 0
 [34700.00000 34917.21196 39.99759 0.00000 -0.00241 3.00000 238.43660] 보상 0
 [34900.00000 44613.88665 39.99759 0.00000 0.00000

In [43]:
# 환경 생성
env = PowerGridEnv()

# 시뮬레이션할 에피소드 수 설정
episodes = 100000
min_cost = float('inf')  # 최소 비용을 기록하기 위한 변수
best_state = None  # 최적의 상태를 저장하기 위한 변수

# 에피소드 반복 실행
for episode in range(1, episodes + 1):
    state = env.reset()  # 환경 초기화
    done = False 
    total_reward = 0

    while not done:
        action = env.action_space.sample()  # 무작위 액션 선택
        next_state, reward, done, info = env.step(action)  # 액션 적용
        total_reward += reward

        # 비용 최소화 조건 검사
        if done and info['cost'] < min_cost:
            min_cost = info['cost']
            best_state = state  # 최적 상태 갱신

    # 에피소드 결과 출력
    if episode % 1000 == 0:  # 진행 상황을 1000 에피소드마다 출력
        print(f'Episode: {episode}, Total Reward: {total_reward}, Minimum Cost: { min_cost * 10e-6}')
    

# 최적 상태와 그때의 비용 출력
print(f'Best State: {best_state}, Minimum Cost Achieved: {min_cost * 10e-6}')


Episode: 1000, Total Reward: 1, Minimum Cost: 14234.373887174559
Episode: 2000, Total Reward: 1, Minimum Cost: 13487.427182521064
Episode: 3000, Total Reward: 1, Minimum Cost: 13116.766916094726
Episode: 4000, Total Reward: 1, Minimum Cost: 12906.810592985565
Episode: 5000, Total Reward: 1, Minimum Cost: 12705.374955700303


KeyboardInterrupt: 

In [49]:
import random
import matplotlib.pyplot as plt

# 환경 초기화
env = PowerGridEnv()
state_size = len(loadData)
action_size = env.action_space.n

q_table = np.zeros((state_size, action_size))
alpha = 0.1     # 학습률
gamma = 0.9    # 할인율
epsilon = 0.1   # 탐험률

# 학습 과정
for i in range(1000):
    num = 0
    state = env.reset()
    done = False
    while not done:
        if random.uniform(0, 1) < epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmin(q_table[num])

        next_state, reward, done, _ = env.step(action)
        old_value = q_table[num, action]
        next_max = np.max(q_table[num])
        new_value = (1 - alpha) * old_value + alpha * (reward + gamma * next_max)
        q_table[num, action] = new_value
        num += 1


In [51]:
env.reset()

array([35000.00000, 5215.40860, 76.50000, 50.00000])

In [55]:
done = False
num = 0
env.reset()
best_action_sequence = []  # 최적 액션 시퀀스 저장
cost = 0
while not done:
    action = np.argmin(q_table[num])
    best_action_sequence.append(action)  # 액션 시퀀스에 추가

    state, reward, done, _ = env.step(action)
    num += 1
    cost += state[6]
    env.render()

[35000.0, 5215.408596026129, 50.3309399044886, 0.33093990448859856, 0, 0, 0.0]
[35100.0, 13735.69150206051, 50.568321110021266, 0.23738120553266104, 0, 0, 0.0]
[34900.0, 22835.42483980296, 50.702371945134566, 0.13405083511330043, 0, 0, 0.0]
[34500.0, 42857.48439511476, 50.516650069687564, 0, -0.09286093772349736, 0, 0.0]
[34500.0, 53821.9314050352, 50.08727381624234, 0, -0.21468812672261328, 0, 0.0]
[34700.0, 44613.88665182545, 49.866965223979555, 0, -0.1101542961313939, 0, 0.0]
[34700.0, 48866.83350038779, 49.55214670174872, 0, -0.15740926111541986, 0, 0.0]
[34900.0, 44613.88665182545, 49.33628255393038, 0, -0.10793207390917169, 0, 0.0]
[34500.0, 48866.83350038779, 49.0170195872551, 0, -0.15963148333764207, 0, 0.0]
[34500.0, 44613.88665182545, 48.79226655054787, 0, -0.11237651835361612, 0, 0.0]
[34700.0, 48866.83350038779, 48.477448028317035, 0, -0.15740926111541986, 0, 0.0]
[34700.0, 34917.21196014155, 48.47262109586945, 0, -0.002413466223794967, 0, 0.0]
[34900.0, 44613.88665182545, 

In [40]:
cost

0

In [39]:
done = False
env.reset()
i = 0
cost = 0

print("       PV         Load      SOC(%)  충전량    방전량    액션   누적 비용")
while not done:
    state, reward, done, _ = env.step(best_action_sequence[i])
    i += 1
    print("", state, "보상", reward)


       PV         Load      SOC(%)  충전량    방전량    액션   누적 비용
 [35000.00000 5215.40860 50.33094 0.33094 0.00000 0.00000 0.00000] 보상 0
 [35100.00000 13735.69150 50.56832 0.23738 0.00000 0.00000 0.00000] 보상 0
 [34900.00000 22835.42484 50.70237 0.13405 0.00000 0.00000 0.00000] 보상 0
 [34500.00000 42857.48440 50.51665 0.00000 -0.09286 0.00000 0.00000] 보상 0
 [34500.00000 53821.93141 50.08727 0.00000 -0.21469 0.00000 0.00000] 보상 0
 [34700.00000 44613.88665 49.86697 0.00000 -0.11015 0.00000 0.00000] 보상 0
 [34700.00000 48866.83350 49.55215 0.00000 -0.15741 0.00000 0.00000] 보상 0
 [34900.00000 44613.88665 49.33628 0.00000 -0.10793 0.00000 0.00000] 보상 0
 [34500.00000 48866.83350 49.01702 0.00000 -0.15963 0.00000 0.00000] 보상 0
 [34500.00000 44613.88665 48.79227 0.00000 -0.11238 0.00000 0.00000] 보상 0
 [34700.00000 48866.83350 48.47745 0.00000 -0.15741 0.00000 0.00000] 보상 0
 [34700.00000 34917.21196 48.47262 0.00000 -0.00241 0.00000 0.00000] 보상 0
 [34900.00000 44613.88665 48.25676 0.00000 -0.10793 0.0