In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
data = pd.read_csv('/kaggle/input/stock-data/Nifty_50_with_indicators_.csv')
data['Date']=pd.to_datetime(data['Date'])
data = data.sort_values(by='Date')
data = data.set_index(pd.DatetimeIndex(data['Date'].values))
data=data[['Date', 'close']]
data

In [None]:
% pip install pandas_ta

In [None]:
#import talib  
import pandas_ta as ta

def calculate_technical_indicators(df):
    # Calculate technical indicators using the historical price data
    #df['SMA'] = talib.SMA(df['Close'], timeperiod=10)  # Simple Moving Average
    #df['EMA'] = talib.EMA(df['Close'], timeperiod=10)  # Exponential Moving Average
    #df['RSI'] = talib.RSI(df['Close'], timeperiod=14)  # Relative Strength Index
    
    data['RSI']=ta.rsi(data.close, length=15)
    data['EMAF']=ta.ema(data.close, length=20)
    data['EMAM']=ta.ema(data.close, length=100)
    data['EMAS']=ta.ema(data.close, length=150)
    # Add more technical indicators as needed

    return df

def create_state_space(data, window_size=10):
    """
    Create the state space for stock price prediction.

    Parameters:
        data (pd.DataFrame): Historical stock price data with 'Date' and 'Close' columns.
        window_size (int): The number of past data points to consider in the state.

    Returns:
        state_space (np.ndarray): A 2D numpy array representing the state space.
    """
    # Calculate technical indicators
    data = calculate_technical_indicators(data)

    # Drop rows with NaN values (due to calculating technical indicators)
    data.dropna(inplace=True)

    # Normalize the data (optional but recommended for better model convergence)
    data = (data - data.min()) / (data.max() - data.min())

    state_space = []
    num_rows = len(data)

    for i in range(num_rows - window_size):
        state = data.iloc[i:i + window_size].values.flatten()
        state_space.append(state)

    return np.array(state_space)

# Example usage
# Assuming 'data' is a pandas DataFrame with columns 'Date' and 'Close' containing historical stock price data
state_space = create_state_space(data, window_size=10)
print(state_space.shape)  # Output: (num_samples, num_features)

In [None]:
# Define the action space as a list of actions
ACTION_BUY = 0
ACTION_SELL = 1
ACTION_HOLD = 2

def create_action_space():
    """
    Create the action space for stock price prediction.

    Returns:
        action_space (list): A list of possible actions.
    """
    action_space = [ACTION_BUY, ACTION_SELL, ACTION_HOLD]
    return action_space

# Example usage
action_space = create_action_space()
print(action_space)  # Output: [0, 1, 2] representing 'Buy', 'Sell', and 'Hold' actions


In [None]:
import random

def create_q_table(state_space, action_space):
    """
    Create the Q-learning table.

    Parameters:
        state_space (np.ndarray): A 2D numpy array representing the state space.
        action_space (list): A list of possible actions.

    Returns:
        q_table (dict): The Q-learning table.
    """
    num_states = state_space.shape[0]
    num_actions = len(action_space)

    # Initialize the Q-table with random values
    q_table = {}
    for state_idx in range(num_states):
        state = tuple(state_space[state_idx])
        q_table[state] = [random.random() for _ in range(num_actions)]

    return q_table

# Example usage
state_space = create_state_space(data, window_size=10)
action_space = create_action_space()
q_table = create_q_table(state_space, action_space)
#print(q_table)
print(len(q_table))

In [None]:
import random

def epsilon_greedy_action(q_table, state_idx, action_space, epsilon):
    """
    Choose an action using epsilon-greedy exploration.

    Parameters:
        q_table (dict): The Q-learning table.
        state_idx (int): The index of the current state in the state space array.
        action_space (list): A list of possible actions.
        epsilon (float): The probability of choosing a random action (exploration rate).

    Returns:
        chosen_action: The chosen action.
    """
    if random.random() < epsilon:
        # Randomly choose an action (exploration)
        chosen_action = random.choice(action_space)
    else:
        # Choose the action with the highest Q-value for the current state (exploitation)
        q_values = q_table[state_idx]
        max_q_value = max(q_values)
        best_actions = [action for action, q_value in enumerate(q_values) if q_value == max_q_value]
        chosen_action = random.choice(best_actions)

    return chosen_action

# Example usage
state_space = create_state_space(data, window_size=60)  # Assuming window_size is 60
action_space = create_action_space()
q_table = create_q_table(state_space, action_space)

# Let's choose an action for a random state index (state_idx)
state_idx = random.randint(0, state_space.shape[0] - 1)
epsilon = 0.2  # Exploration rate (0.2 means 20% of the time explore randomly)
chosen_action = epsilon_greedy_action(q_table, state_idx, action_space, epsilon)
print(chosen_action)

In [None]:
def get_price_movement_reward(current_price, next_price):
    """
    Define the reward based on the price movement between the current and next time steps.

    Parameters:
        current_price (float): The stock price at the current time step.
        next_price (float): The stock price at the next time step.

    Returns:
        reward (float): The reward for the action.
    """
    price_diff = next_price - current_price

    if price_diff > 0:
        # Positive reward for buying low and selling high
        reward = 1.0
    elif price_diff < 0:
        # Negative reward for buying high and selling low
        reward = -1.0
    else:
        # No reward for holding (price remains the same)
        reward = 0.0

    return reward

In [None]:
def q_learning_update(q_table, state, action, reward, next_state, learning_rate, discount_factor):
    """
    Perform Q-learning update for the given state-action pair.

    Parameters:
        q_table (dict): The Q-learning table.
        state (tuple): The current state.
        action (int): The chosen action.
        reward (float): The reward obtained from the chosen action.
        next_state (tuple): The state resulting from the chosen action.
        learning_rate (float): The learning rate (alpha) to update Q-values.
        discount_factor (float): The discount factor (gamma) for future rewards.

    Returns:
        None (the Q-table is updated in place).
    """
    current_q_value = q_table[state][action]
    max_next_q_value = max(q_table[next_state])
    new_q_value = current_q_value + learning_rate * (reward + discount_factor * max_next_q_value - current_q_value)

    # Update the Q-value for the state-action pair
    q_table[state][action] = new_q_value

In [None]:
def update_state(state, action, data, window_size):
    """
    Calculate the next state based on the action taken.

    Parameters:
        state (tuple): The current state.
        action (int): The action taken (e.g., 0 for 'Buy', 1 for 'Sell', 2 for 'Hold').
        data (pd.DataFrame): Historical stock price data with 'Date' and 'Close' columns.
        window_size (int): The number of past data points to consider in the state.

    Returns:
        next_state (tuple): The next state resulting from the action taken.
    """
    current_state_idx = state[-1]  # Assuming the last element of the state is the index of the current data point
    next_state_idx = current_state_idx + 1

    # Update the state to include the next data point based on the action taken
    if action == 0:  # Buy action
        # Move the window one step forward (shift the state one step forward)
        next_state = state[1:] + (next_state_idx, )
    elif action == 1:  # Sell action
        # Move the window one step forward and exclude the oldest data point
        next_state = state[1:-1] + (next_state_idx, )
    else:  # Hold action (do nothing)
        # Move the window one step forward (shift the state one step forward)
        next_state = state[1:] + (next_state_idx, )

    return next_state


In [None]:
for i in range(len(data) - 1):
    current_price = data['close'].iloc[i]
    next_price = data['close'].iloc[i + 1]

    # Calculate reward using the reward function
    reward = get_price_movement_reward(current_price, next_price)

    # Perform Q-learning updates based on the state, action, reward, and next state
    action=epsilon_greedy_action(q_table, state_idx, action_space, epsilon)
    learning_rate = 0.1  # Learning rate (alpha)
    discount_factor = 0.9  # Discount factor (gamma)
    next_state=update_state(state_space, action, data, window_size)
    
    q_learning_update(q_table, state, action, reward, next_state, learning_rate, discount_factor)
    