In [23]:
#Stochastic Oscillator

In [24]:
import numpy as np
import pandas as pd
import ta
import matplotlib.pyplot as plt
import vnstock as vn

companies =[
    'SSI', 'BCM','VHM','VIC','VRE','BVH','POW','GAS','ACB','BID',
'CTG','HDB','MBB','SSB','SHB','STB','TCB','TPB','VCB','VIB','VPB','HPG',
'GVR','MSN','VNM','SAB','VJC','MWG','PLX','FPT']

In [25]:
# Parameters 
RSI_PERIOD = 14
RSI_OVERSOLD = 30
RSI_OVERBOUGHT = 70
STOCH_PERIOD = 14
STOCH_OVERSOLD = 20
STOCH_OVERBOUGHT = 80
initial_investment = 160_000_000

In [26]:
def calculate_indicators(df):
    if df.empty:
        return df
    
    df['RSI'] = ta.momentum.RSIIndicator(df['close'], RSI_PERIOD).rsi()
    
    # Calculate Stochastic Oscillator
    stoch = ta.momentum.StochasticOscillator(df['high'], df['low'], df['close'], STOCH_PERIOD)
    df['%K'] = stoch.stoch()
    df['%D'] = stoch.stoch_signal()

    df = df.dropna().reset_index(drop=True)
    
    return df

In [27]:
def generate_signals(df):
    if df.empty:
        return df
    
    # Initialize Signal column
    df['Signal'] = 0

    # Define constants
    RSI_OVERSOLD = 30
    RSI_OVERBOUGHT = 70
    STOCH_OVERSOLD = 20
    STOCH_OVERBOUGHT = 80

    # Buy signals: RSI in oversold and %K above %D in oversold region
    df.loc[(df['RSI'] < RSI_OVERSOLD) & 
           (df['%K'] < STOCH_OVERSOLD) & 
           (df['%K'] > df['%D']), 'Signal'] = 1

    # Sell signals: RSI in overbought and %K below %D in overbought region
    df.loc[(df['RSI'] > RSI_OVERBOUGHT) & 
           (df['%K'] > STOCH_OVERBOUGHT) & 
           (df['%K'] < df['%D']), 'Signal'] = -1

    return df


In [28]:
def backtest_strategy(df, initial_balance=160000000):
    if df.empty:
        return df
    
    balance = initial_balance
    shares = 0
    df['Portfolio Value'] = balance
    df['Shares'] = 0


    for i in range(1, len(df)):
        if df['Signal'].iloc[i] == 1:  # Buy signal
            if balance > 0: 
                shares_to_buy = balance // df['close'].iloc[i]
                balance -= shares_to_buy * df['close'].iloc[i]
                shares += shares_to_buy


        elif df['Signal'].iloc[i] == -1 and shares > 0:  # Sell signal
            balance += shares * df['close'].iloc[i]
            shares = 0

        df.loc[i, 'Portfolio Value'] = balance + shares * df['close'].iloc[i]
        df.loc[i, 'Shares'] = shares

    df.loc[len(df) - 1, 'Portfolio Value'] = balance + shares * df['close'].iloc[-1]

    return df

In [29]:
def calculate_profit_total(df, initial_balance=160000000):
    if df.empty:
        return 0
    final_balance = df['Portfolio Value'].iloc[-1] 
    total_change = ((final_balance - initial_balance)/initial_balance) * 100
    return total_change


In [30]:
def test_company(symbol, start_date, end_date, resolution='1D', type='stock'):
    df = vn.stock_historical_data(symbol=symbol, 
                            start_date=start_date, 
                            end_date=end_date, resolution=resolution, type=type)
    if df.empty:
        return 0 
    df = calculate_indicators(df)
    df = generate_signals(df)
    df = backtest_strategy(df)
    total_profit_change = calculate_profit_total(df, initial_balance=160000000)
        
    return total_profit_change

In [31]:
START_DATE = '2021-01-01'
END_DATE = '2024-01-01'

In [32]:
profit_all = {}

for company in companies:
    try:
        company_data_all = test_company(company, start_date=START_DATE, end_date=END_DATE, resolution='1D', type='stock')
       
        # Update dictionaries
        profit_all[company] = company_data_all

    except Exception as e:
        print(f"Error occurred for {company}: {e}")

In [33]:
profit_all_df = pd.DataFrame.from_dict(profit_all, orient='index', columns=['Profit During all period']).reset_index()
final_result = profit_all_df.rename(columns={'index': 'Company'})
final_result.to_csv('result/rsi_storch.csv', index=False)