In [53]:
import numpy as np
import pandas as pd
from scripts import  CompanyStockPriceScraper, IndexStockPriceScraper

## WIG ODZIEZ 
W tym notatniki realizowana jest analiza spółek należących tylko i wyłącznie do sektora WIG_ODZIEŻ:
- CCC
- CDL
- EAH
- HRP
- IPO
- LBW
- LPP
- MIR
- MON
- PRT
- SFG
- SNW
- WTN

In [73]:
from sklearn.linear_model import LinearRegression
from enum import Enum

class ForecastModelSelector:
    """
    Class compare three models in linear regression 
    1. Model base on WIG_SECTOR
    2. Model base on WIG_POLAND
    3. Model base on WIG_SECTOR and WIG_POLAND
    """
    class ModelType(Enum):
        SECTOR = 1,
        POLAND = 2,
        SEC_AND_POL = 3,

    def __init__(self) -> None:
        self.__N_DAYS = 30
        self._isp = IndexStockPriceScraper()
        self._csp = CompanyStockPriceScraper()
        self._lr= LinearRegression(n_jobs=-1)

    def __designationError(self, ticker: str, pivot_date: np.datetime64, model_type: ModelType) -> np.ndarray:
        samples = self.__getSamples(ticker, pivot_date, model_type)
        self._lr.fit(samples["x_before"], samples["y_before"])

        y_before_pred = self._lr.predict(samples["x_before"])
        y_after_pred = self._lr.predict(samples["x_after"])

        y_pred = np.concat([y_before_pred, y_after_pred])
        y_true = np.concat([samples["y_before"], samples["y_after"]])
        
        return np.abs(y_pred - y_true) / y_true * 100
    
    def __getSamples(self, ticker: str, pivot_date: np.datetime64, model_type: ModelType) -> dict:
        samples = dict()
        match model_type:
            case ForecastModelSelector.ModelType.SECTOR:
                samples["x_before"] = self._isp.getNHistoricalPriceBefore(ticker, pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1)
                samples["y_before"] = self._csp.getNHistoricalPriceBefore(ticker, pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1)
                samples["x_after"]  = self._isp.getNHistoricalPriceAfter(ticker, pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1)
                samples["y_after"]  = self._csp.getNHistoricalPriceAfter(ticker, pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1)
            case ForecastModelSelector.ModelType.POLAND:
                samples["x_before"] = self._isp.getNHistoricalPriceBefore("WIG_POLAND", pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1)
                samples["y_before"] = self._csp.getNHistoricalPriceBefore("WIG_POLAND", pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1)
                samples["x_after"]  = self._isp.getNHistoricalPriceAfter("WIG_POLAND", pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1)
                samples["y_after"]  = self._csp.getNHistoricalPriceAfter("WIG_POLAND", pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1)
            case ForecastModelSelector.ModelType.SEC_AND_POL:
                samples["x_before"] = np.hstack([self._isp.getNHistoricalPriceBefore(ticker, pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1),
                                                 self._isp.getNHistoricalPriceBefore("WIG_POLAND", pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1)])
                samples["y_before"] = np.hstack([self._csp.getNHistoricalPriceBefore(ticker, pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1),
                                                 self._csp.getNHistoricalPriceBefore("WIG_POLAND", pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1)])
                samples["x_after"]  = np.hstack([self._isp.getNHistoricalPriceAfter(ticker, pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1),
                                                 self._isp.getNHistoricalPriceAfter("WIG_POLAND", pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1)])
                samples["y_after"]  = np.hstack([self._csp.getNHistoricalPriceAfter(ticker, pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1),
                                                 self._csp.getNHistoricalPriceAfter("WIG_POLAND", pivot_date, 0, self.__N_DAYS)["Close"].to_numpy().reshape(-1,1)])
        return samples

    def __calculatePivotDate(self, d1: np.datetime64, d2: np.datetime64) -> np.datetime64:
        pivot_day_not_checking = d1 + (d2 - d1) / 2
        # Check is a stock market day
        wig_poland = self._isp.getHistoricalPrice("WIG_POLAND")
        pivot_day = wig_poland.index[np.sum(wig_poland.index <= pivot_day_not_checking)]
        return pivot_day

    def getJoinDateRange(self, ticker : str) -> list[np.datetime64]:
        wig_poland = self._isp.getHistoricalPrice("WIG_POLAND")
        wig_sector = self._isp.getHistoricalPrice(ticker)
        company    = self._csp.getHistoricalPrice(ticker)
        # Find first join date
        first_join_date = np.max([wig_poland.head(1).index[0], 
                                  wig_sector.head(1).index[0], 
                                  company.head(1).index[0]])
        # Find last join date
        last_join_date = np.max([wig_poland.tail(1).index[0], 
                                 wig_sector.tail(1).index[0], 
                                 company.tail(1).index[0]])
        return [first_join_date, last_join_date]

    def foo(self, ticker: str, reports_date: np.ndarray[np.datetime64]):
        stock_date_range = self.getJoinDateRange(ticker)
        begin_idx = np.sum(reports_date < stock_date_range[0])
        end_idx   = np.sum(reports_date < stock_date_range[1])

        errors = np.array([], dtype=np.int64).reshape(self.__N_DAYS*2,0)
        for i in range(begin_idx, end_idx-1):
            pivot_date = self.__calculatePivotDate(reports_date[i], reports_date[i+1])
            errors = np.hstack([errors, self.__designationError(ticker, pivot_date, 
                                                                ForecastModelSelector.ModelType.POLAND)])

        return errors.T

        
    


    
fms = ForecastModelSelector()
z = fms.foo("CCC", x)
z.shape

$WIG_POLAND.WA: possibly delisted; no timezone found


KeyError: 'Close'

In [71]:
z.std(axis=0)

array([1.88037029, 0.53530109, 2.07340834, 1.73916714, 1.05566863,
       1.13342065, 1.55881749, 1.3671797 , 0.80650032, 1.20934667,
       1.6850066 , 3.21910616, 3.22539923, 1.44412879, 0.93257898,
       1.49009616, 1.09760117, 0.75327333, 1.7871999 , 1.39432187,
       1.16181468, 1.21339496, 2.64085239, 2.87779064, 1.28701956,
       1.29249735, 0.83205288, 1.30722216, 1.84587751, 1.75983664,
       1.75983664, 2.27566192, 2.36757618, 3.94228212, 3.5175662 ,
       3.39612377, 3.21562799, 3.80538723, 4.33920751, 5.93698543,
       7.42506369, 6.6391338 , 7.01536002, 7.67745279, 9.81351072,
       9.44181797, 5.58153265, 5.26046546, 5.43995424, 5.17996943,
       3.16749129, 3.6557006 , 4.87953134, 5.02296248, 3.16926606,
       4.58919837, 6.04192462, 6.25108977, 6.51853146, 6.7574897 ])

In [3]:
tickers_wig_odziez = ["CCC","CDL","EAH","HRP","IPO","LBW","LPP","MIR","MON","PRT","SFG","SNW","WTN"]



In [39]:
report_database = pd.read_csv("../database/mergedData/Annual_V.csv", index_col=[0])

x = np.array(list(map(lambda x : np.datetime64(x), report_database[report_database["Ticker"] == "CCC"]["Data"].to_numpy())))

In [40]:
x

array(['2005-10-03', '2006-05-15', '2007-06-05', '2008-04-30',
       '2009-04-30', '2010-04-30', '2011-04-22', '2012-04-27',
       '2013-04-30', '2014-04-30', '2015-04-30', '2016-04-29',
       '2017-04-28', '2018-03-26', '2019-03-14', '2020-03-06',
       '2021-05-18', '2022-04-20', '2023-04-17'], dtype='datetime64[D]')

In [102]:
z = np.array([10,20])
y = np.array([10,20])
np.concat([z,y])

array([10, 20, 10, 20])