In [1]:
!git clone https://github.com/FaridRash/Advance-Statistical-Learnin--Semester-3

Cloning into 'Advance-Statistical-Learnin--Semester-3'...
remote: Enumerating objects: 49, done.[K
remote: Counting objects: 100% (49/49), done.[K
remote: Compressing objects: 100% (38/38), done.[K
remote: Total 49 (delta 10), reused 11 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (49/49), 33.02 MiB | 19.92 MiB/s, done.
Resolving deltas: 100% (10/10), done.


In [2]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

In [3]:
data_path = "/kaggle/working/Advance-Statistical-Learnin--Semester-3/Dataset/EURUSD-2000-2020-15m.csv"
df = pd.read_csv(data_path)

In [4]:

def EMA(df, base, target, period, alpha=False):
    """
    Function to compute Exponential Moving Average (EMA)
    """
    con = pd.concat([df[:period][base].rolling(window=period).mean(), df[period:][base]])
    if (alpha == False):
        df[target] = con.ewm(span=period, adjust=False).mean()
    else:
        df[target] = con.ewm(alpha=alpha, adjust=False).mean()
    return df

df_ema = EMA(df, 'CLOSE', 'EMA_20', 20)

In [5]:
df_ema.head()

  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()


Unnamed: 0,DATE_TIME,HIGH,LOW,OPEN,CLOSE,EMA_20
0,2000.01.03 00:00:00,1.008,1.0073,1.0073,1.0077,
1,2000.01.03 00:15:00,1.0087,1.0076,1.0078,1.0086,
2,2000.01.03 00:30:00,1.0089,1.0079,1.0087,1.0079,
3,2000.01.03 00:45:00,1.0132,1.0078,1.0078,1.0128,
4,2000.01.03 01:00:00,1.0133,1.012,1.0129,1.0122,


#Packing

In [6]:
def create_packs(df, pack_size):

    numerical_data = df[['HIGH', 'LOW', 'OPEN', 'CLOSE', 'EMA_20']].values
    
    num_packs = len(numerical_data) - pack_size + 1
    
    packs = np.array([numerical_data[i:i + pack_size] for i in range(num_packs)])
    
    return packs

In [7]:
pack_size = 100 
packs = create_packs(df_ema, pack_size)

In [8]:
type(packs), packs.shape

(numpy.ndarray, (500652, 100, 5))

#Normalization

In [9]:
def normalize_packs(packs):

    scaler = MinMaxScaler()
    
    num_packs, pack_size, num_features = packs.shape
    reshaped_packs = packs.reshape(-1, num_features)
    
    normalized_data = scaler.fit_transform(reshaped_packs)
    
    normalized_packs = normalized_data.reshape(num_packs, pack_size, num_features)
    
    return normalized_packs


In [10]:
normalized_packs = normalize_packs(packs)

print("Shape of normalized packs:", normalized_packs.shape)

Shape of normalized packs: (500652, 100, 5)


In [11]:
train_size = 0.7
val_size = 0.1
test_size = 0.2

packs_train, packs_temp, norm_packs_train, norm_packs_temp = train_test_split(
    packs, normalized_packs, test_size=(val_size + test_size), random_state=42)

In [12]:
val_ratio = val_size / (val_size + test_size)  # Adjust ratio for temp split
packs_val, packs_test, norm_packs_val, norm_packs_test = train_test_split(
    packs_temp, norm_packs_temp, test_size=(1 - val_ratio), random_state=42
)

In [13]:
print("Packs Train size:", len(packs_train))
print("Packs Validation size:", len(packs_val))
print("Packs Test size:", len(packs_test))
print('-----------')
print("Normalized Train size:", len(norm_packs_train))
print("Normalized Validation size:", len(norm_packs_val))
print("Normalized Test size:", len(norm_packs_test))

Packs Train size: 350456
Packs Validation size: 50065
Packs Test size: 100131
-----------
Normalized Train size: 350456
Normalized Validation size: 50065
Normalized Test size: 100131


In [14]:
def get_state(packs, index):

    state = packs[index]
    next_state = packs[index + 1] if index + 1 < len(packs) else None
    return state, next_state


In [15]:
def select_action(q_values, epsilon):

    if np.random.rand() < epsilon:
        action = np.random.choice([0, 1, 2])
    else:
        action = np.argmax(q_values)
    return action

In [16]:
def calculate_reward(action, open_price, stop_loss, target, target_multiplier, close_price, low, high, max_hold, current_hold):

    if action == 'buy':
        if low <= stop_loss: 
            return -1.0 
        elif high >= target:
            return target_multiplier

    elif action == 'sell':
        if high >= stop_loss: 
            return -1.0
        elif low <= target: 
            return target_multiplier

    elif current_hold >= max_hold:
        return (close_price - open_price) / open_price - 0.1 

    elif action == 'hold':
        return -0.05

    # Default: No action has triggered any event yet
    return 0.0

In [17]:
def transition_to_next_state(current_state, action, current_hold, packs, pack_index, 
                             open_price=None, stop_loss=None, target=None, 
                             max_hold=None, balance=1000, epsilon=0.1, target_multiplier=2):
    if pack_index >= len(packs) - 1:
        return None, 0.0, None, None, None, 0, balance, True  # End of episode

    # Move to the next pack
    next_pack = packs[pack_index + 1]
    next_state = next_pack
    low, high, close = next_pack[1], next_pack[2], next_pack[3]  # [H, L, O, C]

    # Reward initialization
    reward = 0.0
    done = False

    # Action: Buy
    if action == "buy" and open_price is None:
        open_price = close  # Open a new buy position
        stop_loss_multiplier = random.uniform(0.005, 0.03) if random.random() < epsilon else 0.0175
        stop_loss = open_price * (1 - stop_loss_multiplier)
        target = open_price * (1 + target_multiplier * stop_loss_multiplier)  # Target is 2x stop-loss
        current_hold = 0  # Reset holding period

    # Action: Sell
    elif action == "sell" and open_price is None:
        open_price = close  # Open a new sell position
        stop_loss_multiplier = random.uniform(0.005, 0.03) if random.random() < epsilon else 0.0175
        stop_loss = open_price * (1 + stop_loss_multiplier)
        target = open_price * (1 - target_multiplier * stop_loss_multiplier)  # Target is 2x stop-loss
        current_hold = 0  # Reset holding period

    # Action: Hold
    elif action == "hold" and open_price is not None:
        current_hold += 1  # Increment holding period

    # Reward Calculation (centralized using `calculate_reward`)
    reward = calculate_reward(
        action=action,
        open_price=open_price,
        stop_loss=stop_loss,
        target=target,
        target_multiplier=target_multiplier,
        close_price=close,
        low=low,
        high=high,
        max_hold=max_hold,
        current_hold=current_hold,
    )

    # Update balance based on reward
    balance += reward * balance

    # Reset parameters if position is closed
    if reward != 0.0:  # A reward signifies that a stop-loss, target, or hold limit was hit
        open_price, stop_loss, target = None, None, None
        current_hold = 0

    # Determine if the episode is done
    if pack_index >= len(packs) - 2:  # Check if the next state would exceed the pack size
        done = True

    return next_state, reward, open_price, stop_loss, target, current_hold, balance, done


In [18]:
def epsilon_greedy_parameters(epsilon, stop_loss_range, target_range, hold_range):
    """
    Function to determine stop-loss, target, and max hold period using epsilon-greedy strategy.

    Parameters:
    - epsilon: Exploration probability.
    - stop_loss_range: Tuple (min_stop_loss, max_stop_loss) for stop-loss percentage range.
    - target_range: Tuple (min_target, max_target) for target multiplier range.
    - hold_range: Tuple (min_hold, max_hold) for max hold period range.

    Returns:
    - selected_stop_loss: Stop-loss percentage.
    - selected_target: Target multiplier.
    - selected_max_hold: Maximum hold period.
    """
    # Stop-loss selection
    if np.random.rand() < epsilon:  # Exploration
        selected_stop_loss = np.random.uniform(*stop_loss_range)
    else:  # Exploitation (use mid-point as an example of learned behavior)
        selected_stop_loss = (stop_loss_range[0] + stop_loss_range[1]) / 2

    # Target selection
    if np.random.rand() < epsilon:  # Exploration
        selected_target = np.random.uniform(*target_range)
    else:  # Exploitation
        selected_target = (target_range[0] + target_range[1]) / 2

    # Max hold selection
    if np.random.rand() < epsilon:  # Exploration
        selected_max_hold = np.random.randint(*hold_range)
    else:  # Exploitation
        selected_max_hold = (hold_range[0] + hold_range[1]) // 2

    return selected_stop_loss, selected_target, selected_max_hold


In [19]:
class ForexTradingEnv:
    def __init__(self, packs, max_hold):
        self.packs = packs
        self.max_hold = max_hold
        self.current_index = 0
        self.current_hold = 0
        self.open_price = None
        self.stop_loss = None
        self.target = None
        self.balance = 1000  # Starting balance
        self.done = False

    def reset(self):
        """Reset the environment to the initial state."""
        self.current_index = 0
        self.current_hold = 0
        self.open_price = None
        self.stop_loss = None
        self.target = None
        self.balance = 1000  # Reset balance
        self.done = False
        # Correct call to `get_state` with both arguments
        state, _ = get_state(self.packs, self.current_index)
        return state

    def step(self, action):
        """Take an action and transition to the next state."""
        if self.done:
            raise ValueError("Cannot step in a finished environment. Call reset().")

        # Current state
        current_state, _ = get_state(self.packs, self.current_index)

        # Transition to the next state
        result = transition_to_next_state(
            current_state=current_state,
            action=action,
            current_hold=self.current_hold,
            packs=self.packs,
            pack_index=self.current_index,
            open_price=self.open_price,
            stop_loss=self.stop_loss,
            target=self.target,
            max_hold=self.max_hold,
            balance=self.balance,
        )

        next_state, reward, self.open_price, self.stop_loss, self.target, self.current_hold, self.balance, self.done = result

        # Increment index if not done
        if not self.done:
            self.current_index += 1

        return next_state, reward, self.done

    def render(self):
        """Render the current environment state."""
        print(f"Index: {self.current_index}")
        print(f"Balance: {self.balance}")
        print(f"Open Price: {self.open_price}")
        print(f"Stop Loss: {self.stop_loss}")
        print(f"Target: {self.target}")
        print(f"Current Hold: {self.current_hold}")


In [20]:
import random
from collections import deque

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)
    
    def __len__(self):
        return len(self.buffer)

In [21]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(state_size, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_size)
        )
    
    def forward(self, x):
        return self.fc(x)

def train_dqn(env, dqn_model, target_model, replay_buffer, optimizer, batch_size, gamma, epsilon, epsilon_decay, min_epsilon, num_episodes, target_update_freq):
    losses = []
    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        done = False

        while not done:
            # Convert state to tensor
            state_tensor = torch.FloatTensor(state).unsqueeze(0)

            # Select action
            q_values = dqn_model(state_tensor)
            action = select_action(q_values, epsilon)

            # Take action in the environment
            next_state, reward, done = env.transition_to_next_step(action)
            total_reward += reward

            # Store transition in replay buffer
            replay_buffer.push(state, action, reward, next_state, done)

            # Update state
            state = next_state

            # Training step
            if len(replay_buffer) >= batch_size:
                # Sample a batch
                transitions = replay_buffer.sample(batch_size)
                states, actions, rewards, next_states, dones = zip(*transitions)

                # Convert to tensors
                states = torch.FloatTensor(states)
                actions = torch.LongTensor(actions).unsqueeze(1)
                rewards = torch.FloatTensor(rewards)
                next_states = torch.FloatTensor(next_states)
                dones = torch.FloatTensor(dones)

                # Compute Q values and targets
                current_q = dqn_model(states).gather(1, actions).squeeze(1)
                max_next_q = target_model(next_states).max(1)[0]
                target_q = rewards + gamma * max_next_q * (1 - dones)

                # Compute loss
                loss = nn.MSELoss()(current_q, target_q.detach())
                losses.append(loss.item())

                # Backpropagation
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        # Update target network
        if episode % target_update_freq == 0:
            target_model.load_state_dict(dqn_model.state_dict())

        # Decay epsilon
        epsilon = max(min_epsilon, epsilon * epsilon_decay)

        print(f"Episode {episode + 1}/{num_episodes}, Total Reward: {total_reward}, Epsilon: {epsilon:.3f}")

    return losses
