In [1]:
from datetime import datetime
import pandas as pd
from statsmodels.tsa.arima.model import ARIMA
from pmdarima import auto_arima
from vnstock3 import Vnstock
import pandas_ta as ta
import numpy as np
import F 

In [2]:
now = datetime.now()
current_date = now.strftime("%Y-%m-%d")
vn30 = Vnstock().stock(symbol='VN30', source='VCI')
df_vn30 = vn30.quote.history(start='2023-08-01', end = current_date, to_df=True)

vni = Vnstock().stock(symbol='VNINDEX', source='VCI')
df_vni = vni.quote.history(start='2023-08-01', end = current_date, to_df=True)



In [3]:
def process_data(data):
    data['time'] = pd.to_datetime(data['time'])
    data.sort_values(by='time', ascending=True, inplace=True)
    data.reset_index()
    
    data['close'] = pd.to_numeric(data['close'], errors='coerce')
    data['sma5'] = ta.sma(data['close'], length=5)
    data['sma10'] = ta.sma(data['close'], length=10)

    # Thêm chỉ báo EMA
    data['ema5'] = ta.ema(data['close'], length=5)
    data['ema10'] = ta.ema(data['close'], length=10)

    # Thêm chỉ báo RSI
    data['rsi'] = ta.rsi(data['close'], length=14)

    # Thêm chỉ báo MACD
    data.ta.macd(close='close', fast=12, slow=26, signal=9, append=True)

    data.dropna(inplace=True)
    data.reset_index()
    return data

In [4]:
data_VN30 = process_data(df_vn30)
data_VNI = process_data(df_vni)

In [5]:
def model_ARIMA(data):
    data = data[['close','sma5','sma10','ema5','ema10','rsi','MACD_12_26_9','MACDh_12_26_9','MACDs_12_26_9']]
    stepwise_model = auto_arima(data['close'], trace=True, suppress_warnings=True, stepwise=True)
    model = ARIMA(data['close'], order=stepwise_model.order)
    model_fit = model.fit()
    return model_fit

def output_ARIMA(data, trend):
    data = data.head(trend)
    data_features = data[['close','sma5','sma10','ema5','ema10','rsi','MACD_12_26_9','MACDh_12_26_9','MACDs_12_26_9']]
    if data_features.isnull().values.any():
        raise ValueError("Train data contains NaN values.")
    
    model = model_ARIMA(data_features)
    next_day_forecast = model.forecast(steps=3)
    
    return np.mean(next_day_forecast)

In [6]:
def test(data, k):
    # Ensure the 'Price' column is properly referenced
    data_output = data[['time', 'close']].copy()
    data_output['position'] = 0

    # Calculate predicted price for each row starting from index k
    for i in range(k, len(data)):
        # Get data for the previous k days to predict the price for the current day
        historical_data = data.iloc[i-k:i][['close','sma5','sma10','ema5','ema10','rsi','MACD_12_26_9','MACDh_12_26_9','MACDs_12_26_9']]
        
        # Assuming output_ARIMA returns the predicted price based on previous data
        predicted_price = output_ARIMA(historical_data, k)
        
        # Calculate the price difference between the last day in the prediction data and the predicted price
        price_difference = predicted_price - data.iloc[i-1]['close']  # Make sure 'price' is correctly referenced

        # Update position based on calculated price difference
        if price_difference > 2:
            data_output.loc[i, 'position'] = 1
        elif price_difference < 2:
            data_output.loc[i, 'position'] = -1
    data_output.dropna(inplace = True)
    return data_output

In [None]:
%%capture
data_VNI_position = test(data_VNI, 30)
data_VN30_position = test(data_VN30, 30)

In [None]:
backtesting = F.BacktestInformation(data_VN30_position['time'], data_VN30_position['position'], data_VN30_position['close'])
backtest = backtesting.Plot_PNL()

In [None]:
backtesting = F.BacktestInformation(data_VNI_position['time'], data_VNI_position['position'], data_VNI_position['close'])
backtest = backtesting.Plot_PNL()

In [None]:
%%capture
data_VNI_position = test(data_VNI, 15)
data_VN30_position = test(data_VN30, 15)

In [None]:
backtesting = F.BacktestInformation(data_VN30_position['time'], data_VN30_position['position'], data_VN30_position['close'])
backtest = backtesting.Plot_PNL()

In [None]:
backtesting = F.BacktestInformation(data_VNI_position['time'], data_VNI_position['position'], data_VNI_position['close'])
backtest = backtesting.Plot_PNL()

In [None]:
%%capture
data_VNI_position = test(data_VNI, 7)
data_VN30_position = test(data_VN30, 7)

In [None]:
backtesting = F.BacktestInformation(data_VN30_position['time'], data_VN30_position['position'], data_VN30_position['close'])
backtest = backtesting.Plot_PNL()

In [None]:
backtesting = F.BacktestInformation(data_VNI_position['time'], data_VNI_position['position'], data_VNI_position['close'])
backtest = backtesting.Plot_PNL()

In [None]:
%%capture
data_VNI_position = test(data_VNI, 3)
data_VN30_position = test(data_VN30, 3)

In [None]:
backtesting = F.BacktestInformation(data_VN30_position['time'], data_VN30_position['position'], data_VN30_position['close'])
backtest = backtesting.Plot_PNL()

In [None]:
backtesting = F.BacktestInformation(data_VNI_position['time'], data_VNI_position['position'], data_VNI_position['close'])
backtest = backtesting.Plot_PNL()