In [1]:
cd ..

D:\J.H.LEE\05. CODING\Python\Project\tdatlib


In [3]:
# For Library
import plotly.graph_objects as go
import pandas as pd
import numpy as np
from plotly.subplots import make_subplots
from datetime import timedelta

# For Testing
import tdatlib as tdat
from IPython.display import display, HTML
display(HTML("<style>.container { width:80% !important;}</style>"))

# LIB DEV

## Functionality

In [4]:
def align(l:pd.Series or pd.DataFrame, r:pd.Series or pd.DataFrame):
    """
    두 개의 시계열 데이터 정렬(왼쪽 시계열 기준)
    :param l: Left
    :param r: Right
    """
    join = pd.concat(objs={'L':l, 'R':r}, axis=1).dropna()
    join = join[join.index.isin(l.index)]
    return join.L, join.R


def normalize(time_series:pd.Series or np.array, a:float=0, b:float=1) -> pd.Series:
    """
    정규화
    :param time_series: 시계열 데이터 
    :param a: 정규 최소 값
    :param b: 정규 최대 값
    """
    return (b - a) * (time_series - time_series.min()) / (time_series.max() - time_series.min()) + a


def typify(ohlcv:pd.DataFrame) -> pd.Series:
    """
    가격 전형화
    :param ohlcv: 가격
    """
    if not ('종가' in ohlcv.columns and '고가' in ohlcv.columns and '저가' in ohlcv.columns):
        raise KeyError
    return (1/3) * ohlcv.종가 + (1/3) * ohlcv.고가 + (1/3) * ohlcv.저가


def corrcoeff(l:pd.Series, r:pd.Series) -> float:
    """
    상관계수
    :param l: Left
    :param r: Right
    """
    return pd.concat(objs=align(l, r), axis=1).corr(method='pearson', min_periods=1).iloc[0, 1]


def weighted_corrcoeff(joined:pd.DataFrame) -> float:
    """
    가중 상관계수 (시간 구간 별)
    :param joined: 정렬된 시계열데이터 (Left, Right)
    """
    days = (joined.index[-1] - joined.index[0]).days
    gaps = [0, 92, 183, 365, 365 * 2, 365 * 3, 365 * 5, 365 * 10]
    n = [i + 1 for i in range(len(gaps)-1) if gaps[0] < days <= gaps[i + 1]][0]

    coeffs = list()
    for g in gaps[1:n]:
        coeff = joined[joined.index >= (joined.index[-1] - timedelta(g))].corr().iloc[0, 1]
        coeffs.append(coeff)
    coeffs.append(joined.corr().iloc[0, 1])
    return np.array(coeffs).mean()


def corr_rolling(l:pd.Series, r:pd.Series, month:int) -> pd.DataFrame:
    """
    이동 상관계수 데이터프레임
    :param l: Left
    :param r: Right
    :param month: 이동 범위(개월)
    """
    prev_day = l.index[-1] - timedelta(days=int(month * 30.5))
    samples = len(r.index[r.index >= prev_day])
    index = np.arange(start=-samples, stop=samples + 1, step=5)

    dates = [r.index[-1] + timedelta(int(i)) for i in index]
    data = [[corr(l, r.shift(i)), i] for i in index]
    return pd.DataFrame(data=data, index=dates, columns=['corrcoef', 'days'])


class corr(object):

    def __init__(self, l:pd.Series, r:pd.Series, l_name:str=str(), r_name:str=str()):
        self._l, self._r = align(l=l, r=r)
        self._j = pd.concat(objs=[self._l, self._r], axis=1)
        self._lname, self._rname = l_name if l_name else 'Left', r_name if r_name else 'Right'
        return

    def _coeffr(self) -> (float, int, int):
        if not hasattr(self, '__coeffr'):
            rolling = corr_rolling(l=self._l, r=self._r, month=6)
            rolling['abs'] = rolling['corrcoef'].abs()
            fitted = rolling[rolling['abs'] == rolling['abs'].max()]
            coeff, step = fitted.iloc[0, 0], fitted.iloc[0, 1]
            self.__setattr__(
                '__coeffr',
                (coeff, self._r.index[-1 if step < 0 else 0] - self._r.index[step - 1 if step < 0 else step], step)
            )
        return self.__getattribute__('__coeffr')

    @property
    def coeff(self) -> float:
        """
        전체 시계열 상관계수
        """
        if not hasattr(self, '_coeff'):
            self.__setattr__('_coeff', self._j.corr().iloc[0, 1])
        return self.__getattribute__('_coeff')

    @property
    def coeffw(self) -> float:
        """
        기간 구별 가중 상관계수
        """
        if not hasattr(self, '_coeffw'):
            self.__setattr__('_coeffw', self.coeff if len(self._j) < 93 else weighted_corrcoeff(self._j))
        return self.__getattribute__('_coeffw')

    @property
    def coeffr(self) -> float:
        """
        이동 상관계수
        """
        coeff, _, __ = self._coeffr()
        return coeff

    @property
    def trace_l(self) -> go.Scatter:
        return go.Scatter(
            x=self._l.index, y=self._l, name=self._lname,
            visible=True, showlegend=True,
            xhoverformat='%Y/%m/%d', hovertemplate='%{x}<br>%{y}'
        )

    @property
    def trace_r(self) -> go.Scatter:
        return go.Scatter(
            x=self._r.index, y=self._r, name=self._rname,
            visible=True, showlegend=True,
            xhoverformat='%Y/%m/%d', hovertemplate='%{x}<br>%{y}'
        )

    @property
    def trace_rshift(self) -> go.Scatter:
        _, _, step = self._coeffr()
        return go.Scatter(
            x=self._r.shift(step).index, y=self._r.shift(step), name=f'{self._rname}<br>shifted',
            visible='legendonly', showlegend=True, mode='line', line=dict(dash='dot'),
            xhoverformat='%Y/%m/%d', hovertemplate='%{x}<br>%{y}'
        )

    @property
    def trace_corr(self) -> go.Scatter:
        return go.Scatter(
            x=self._l, y=self._r, name='산포도',
            meta=self._j.index, mode='markers',
            hovertemplate='%{meta}<br>x = %{x}<br>y = %{y}<extra></extra>'
        )

# TESTER

## BASIC DATASET

### Handler

In [5]:
ecos = tdat.macro.ecos()
fred = tdat.macro.fred()
index = tdat.market.index()

period = 5
ecos.period = fred.period = index.period = period

ticker = '105560'
stock = tdat.stock.kr(ticker=ticker)
stock.period = period

### Indicator

In [6]:
cols = ['시가', '고가', '저가', '종가']
kospi = index.kospi
krbank = index.bank
exchange = ecos.원달러환율
margin = ecos.load('121Y015', '총대출(당좌대출 제외)') - ecos.load('121Y013', '저축성수신(금융채 제외)')

ohlcv = stock.ohlcv
# pd.concat(objs={stock.name:ohlcv[cols], '코스피':kospi[cols], '환율':exchange, '은행':krbank[cols]}, axis=1)

## TESTING DATASET

In [7]:


left = krbank.copy()
# left = ohlcv.copy()

# right = exchange.copy()
right = margin.copy()

mycorr = tdat.tools.corr(l=left, r=right)
# mycorr = corrmodel(ohlcv2hlc(left), ohlcv2hlc(right))
# mycorr = corrmodel(ohlcv2hlc(left), right)
# mycorr = corrmodel(right, ohlcv2hlc(left))

print(mycorr.coeff)
print(mycorr.coeffw)
print(mycorr.coeffr)

0.9988668127405339
0.9988668127405339


TypeError: bad operand type for abs(): 'corr'

## VISUALIZE

### Scatter

In [None]:
_x, _y = left.종가, right

x, y = normalize(_x, a=-1, b=1), normalize(_y, a=-1, b=1)
# x, y = _x.resample('M').last(), _y.resample('M').last()
# x = _x.resample('M').last().pct_change()
# y = _y.resample('M').last().pct_change()

fig = go.Figure()
fig.add_trace(go.Scatter(
    x=x, y=y, name='산포도',
    mode='markers',
    
))
fig.update_layout(height=700)
fig.show()

### x-y

In [None]:
y1 = price_rel(comparatee.종가)
y2 = price_rel(comparator.종가)

fig = go.Figure()
fig.add_trace(go.Scatter(
    x=y1.index, y=y1, name='LEFT',
    xhoverformat='%Y/%m/%d'
))

fig.add_trace(go.Scatter(
    x=y2.index, y=y2, name='RIGHT',
    xhoverformat='%Y/%m/%d'
))
fig.update_layout(height=700)
fig.show()

### x-y1, y2

In [None]:
y1 = left.종가
# y2 = right.종가
y2 = right

fig = make_subplots(
    rows=2, cols=1, row_width=[0.3, 0.7], vertical_spacing=0.02, shared_xaxes=False,
    specs=[
        [{"type": "xy", "secondary_y": True}],
        [{"type": "xy"}]
    ]
)

fig.add_trace(go.Scatter(
    x=y1.index, y=y1, name='LEFT',
    xhoverformat='%Y/%m/%d'
), row=1, col=1, secondary_y=False)

fig.add_trace(go.Scatter(
    x=y2.index, y=y2, name='RIGHT',
    xhoverformat='%Y/%m/%d'
), row=1, col=1, secondary_y=True)

fig.add_trace(go.Scatter(
    x=rl.days, y=rl.corrcoef, name='Rolling'
), row=2, col=1)

fig.update_layout(height=700)
fig.show()