# Case 1: One transmitter and one receiver.

(Article: Look-Ahead and Learning Approaches for Energy Harvesting Communications Systems)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap

## Step 1. Define problem, start and goal.

Point-to-point communication, one source (S) and one destination (D).
S has infinite buffer to store data, finite battery Bmax, Bi battery levels. (Cuantizada)

Model parameters:


In [None]:
# Time slotted system. Each time slot (Ts) is 1s duration. 
Tc = 1

# Bandwith
BW = 1 # MHz
# Noise Spectral density 
sigma_n2 =4e-21 * 1e6  # W/MHz 
sigma2 = 1e-9 # W/Hz 

# B is the Battery capacity. B_max is the maximum capacity. 
# Nb is the number of 'slots' in B (Quantized battery).
B_max = 12 #units
B_levels = list(range(B_max + 1)) 

# Energy harvested:
Eh = 0.05 #J
EH_levels = np.linspace(0,0.25,6)
energy_levels = [0, 1, 2, 3, 4, 5]     # unidades discretas
P_levels = EH_levels   # same as energy units

# Transition Energy Probability Matrix
E_tm = np.array([
      [0.4011, 0.3673, 0.1027, 0.0899, 0.0279, 0.0111],
      [0.4072, 0.3441, 0.1002, 0.0973, 0.0305, 0.0207],
      [0.3966, 0.3239, 0.1165, 0.0860, 0.0400, 0.0370],
      [0.3796, 0.3272, 0.1158, 0.0782, 0.0514, 0.0478],
      [0.3612, 0.3451, 0.1055, 0.0837, 0.0501, 0.0544],
      [0.3711, 0.3341, 0.1107, 0.0801, 0.0502, 0.0538]])

# Channel states:
H_states = np.linspace(0,-20,11) # In dB. Random transition probabilities between them.
channel_states = [0.01, 0.05, 0.1, 0.2]  # |h|^2
H_linear = 10 ** (H_states / 10)

print(H_linear)


[1.         0.63095734 0.39810717 0.25118864 0.15848932 0.1
 0.06309573 0.03981072 0.02511886 0.01584893 0.01      ]


Constraints:

In [None]:
# The energy harvesting and channel gain are modeled as two independent Markov chains.
def EnergyHarvested(Eprobability,actual_e,Ne): # Mirar si se usa
    if np.random.random() < Eprobability:
        # stays in the same energy level
        return actual_e
    else:
        # Jumps to other energy level
        return np.random.randint(Ne)
    
def EnergyCausalityConstraints(P_i,B_i):
    return Tc*P_i <= B_i

def PowerConstraint(P_i):
    return P_i >= 0

def BatteryConstraint(B_i):
    return B_i >= 0

def received_signal(Pi,Hi,xi): # Mirar si se usa
    return np.sqrt(Pi)*Hi*xi + np.random.normal(0, 1)


In [None]:
# starting state selected randomly.

## Step 2. Define RL Parameters

In [None]:
# runs
runs = 500
# Discount factor
gamma = 0.9
# Learning rate
alpha = 0.1
# e-greedy algorithm variables
def explorationProbability(i):
    return np.exp(-0.001*i)
# Convergence based algorithm variables: 
zeta = 4
def tao(EpisodeTotalAvailableTime):
    return 0.8*EpisodeTotalAvailableTime

Reward Funcition (amount of received data):

In [None]:
def RewardFunction(power_idx, h_idx):
    Power_i = P_levels[power_idx]
    Hi = H_linear[h_idx]
    return Tc*np.log2(1+(Power_i*(Hi**2))/sigma_n2)

MDP Reformulation

In [None]:
# ====== VALID ACTIONS ======
def valid_actions(b_idx):
    battery = B_levels[b_idx]
    return [p for p in P_levels if p <= battery]

# ====== Energy and channel probability transition ======
    # Probability matrix between energy states
def energy_transition_prob(e_idx, e_next_idx):
    return E_tm[e_idx, e_next_idx]
    # Random transition probabilities between channel states.
 
def channel_transition_prob():
    return np.random.uniform(0, 1)

def next_BatteryState(b_idx, e_idx, power_idx):
   B_i = B_levels[b_idx]
   E_i = EH_levels[e_idx]
   Power_i = P_levels[power_idx]
   B_next = min(max(B_i + E_i - Tc*Power_i,0), B_max)
   return B_levels.index(B_next)

    # Full state transition probability: 
def TransitionProbability(state, power_idx, next_state):
    b_idx, h_idx, e_idx = state
    b2, h2, e2 = next_state
    # state = (b, h, e)
    if b2 != next_BatteryState(b_idx, e_idx, power_idx):
        return 0.0
    
    return energy_transition_prob(e_idx,e2)*channel_transition_prob()



In [None]:
states = [B_levels, H_linear, EH_levels]
#s = (b_idx, h_idx, e_idx)
print(states)

## Look-ahead policy for EH communications

First problem: The source has causal knowledge about states (Knowledge about the past and current states).

The objective is to reduce the overflow energy. Use all overflow energy regardless of the channel state, otherwise it will be lost. The paper wants to find an optimal transmission policy:

Given the current battery level, channel condition, and harvested energy, how much power should I transmit to maximize long-term throughput?

In [None]:
def overflow_energy(battery_idx,energy_idx):
    battery = B_levels[battery_idx]
    energy = EH_levels[energy_idx]
    return np.max(battery+np.min(energy,B_max)-B_max,0)

Algorithm 1: 

In [None]:
def Bellman_equation(state, next_state, power_idx,next_power_idx):
    b_idx, h_idx, e_idx = state
    reward = RewardFunction(power_idx, h_idx) + gamma*TransitionProbability(state, next_power_idx, next_state)*RewardFunction(next_power_idx, next_state[1])
    return reward

In [None]:
""" num_states = len(B_levels) * len(H_states) * len(EH_levels)
V = np.zeros((len(B_levels), len(H_states), len(EH_levels)))
policy = np.zeros_like(V, dtype=int)
for b_idx in range(len(B_levels)):
        for h_idx in range(len(H_states)):
            for e_idx in range(len(EH_levels)):

                e_overflow = overflow_energy(b_idx,e_idx)

                for power_idx in valid_actions(b_idx) """
                

In [None]:
# Copia directa de chatgpt 
def lookahead_value(state, energy_used, P_e, P_h):
    b, h, e = state
    p_tx = energy_used / Tc
    p_idx = P_levels.index(p_tx)
    h_idx = H_linear.index(h)
    b_idx = B_levels.index(b)
    e_idx = B_levels.index(e)
    
    # Recompensa actual
    r_now = RewardFunction(p_idx, h_idx)
    
    # Batería futura
    b_next = next_BatteryState(b_idx, e_idx, p_idx)
    
    expected_future = 0.0
    
    for e_next, pe in P_e.items():
        for h2_next, ph in P_h.items():
            r_next = RewardFunction(b_next / Tc, h2_next)
            expected_future += pe * ph * r_next
    
    return r_now + gamma * expected_future

def lookahead_policy(state, P_e, P_h, M=None):
    b, h2, e = state
    
    e_ovf = overflow_energy(b, e)
    possible_energies = list(range(e_ovf, b + 1))
    
    if M is not None and M < len(possible_energies):
        possible_energies = np.random.sample(possible_energies, M)
    
    best_val = -np.inf
    best_energy = 0
    
    for lam in possible_energies:
        val = lookahead_value(state, lam, P_e, P_h)
        if val > best_val:
            best_val = val
            best_energy = lam
    
    return best_energy / Tc

## Aprendizaje por refuerzo (modelo desconocido)

Estructura Q-table

In [None]:
from collections import defaultdict
Q = defaultdict(lambda: defaultdict(float))

SARSA

In [None]:
def sarsa_update(s, a, r, s_next, a_next, alpha):
    Q[s][a] += alpha * (r + gamma * Q[s_next][a_next] - Q[s][a])

Q-learning

In [None]:
def q_learning_update(s, a, r, s_next, alpha):
    best_next = max(Q[s_next].values()) if Q[s_next] else 0
    Q[s][a] += alpha * (r + gamma * best_next - Q[s][a])

Exploración ε-greedy

In [None]:
def epsilon_greedy(state, actions, epsilon):
    if np.random.random() < epsilon:
        return np.random.choice(actions)
    return max(actions, key=lambda a: Q[state][a])