In [2]:
cd ..

/Users/mac/Desktop/Work Space/indicator


In [3]:
import talib as ta
import pandas as pd

from coinmarketcap_dataloader import CoinMarketCap
from coinmarketcap_dataloader import CMC_API_KEY

In [26]:
import time 
import numpy as np
import talib as ta

from tqdm import tqdm
from typing import Tuple
from datetime import datetime
from datetime import timedelta


class Indicator(CoinMarketCap):
    PERIODS = [7, 28, 91]

    SCORE_NAMES = ['7_period_score', 
                   '28_period_score', 
                   '91_period_score', 
                   'mean_score']

    def __init__(self, API_KEY):
        super().__init__(API_KEY)

        self.tickers:list = None
        self.historical_data_dict:dict = {}

        self.update_tickers()


    def update_tickers(self):
        """
        Ticker List를 멤버변수 self.tickers에 저장
        """
        ticker_infos = self.get_all_listings()
        tickers = [info['symbol'] for info in ticker_infos]
        self.tickers = list( map(lambda x:x.upper(), tickers) )


    def update_historical_data_frame(self, tickers:list=None):
        """
        Ticker 과거 데이터를 멤버변수 self.historical_data_frame에 저장
        """

        tickers = self.tickers if tickers is None else tickers

        for ticker in tqdm(tickers):
            
            try: 
                time.sleep(0.5)
                self.historical_data_dict.update({ticker:self.get_ohlcv_n(ticker)})

            except:
                continue

    def get_total_score(self, tickers:list) -> dict:
        """
        입력 ticker들에 대한 total score list를 리턴
        """

        tickers = set(tickers).intersection(
            set(self.historical_data_dict.keys())
        )

        scoring_func = [self.get_RSI_score,
                        self.get_MACD_score,
                        self.get_OBV_score,
                        self.get_STOCH_score,
                        self.get_BBANDS_score]

        score_array = np.array([[func(ticker) for func in scoring_func] for ticker in tqdm(tickers)])

        score_n1 = self._get_normalize( np.sum(score_array[:,:,0], axis=-1) )[:,np.newaxis]
        score_n2 = self._get_normalize( np.sum(score_array[:,:,1], axis=-1) )[:,np.newaxis]
        score_n3 = self._get_normalize( np.sum(score_array[:,:,2], axis=-1) )[:,np.newaxis]
        score_mean = np.mean( np.concatenate([score_n1, score_n2, score_n3], axis=-1), axis=-1 )[:,np.newaxis]
        score_all = np.concatenate( [score_n1, score_n2, score_n3, score_mean], axis=1 )
        score_all = np.round(score_all, decimals=3)

        score_dict = {ticker: {name:score for name, score in zip(self.SCORE_NAMES, scores)} \
                    for ticker, scores in zip(tickers, score_all)}

        return score_dict


    def get_RSI_score(self, ticker:str) -> Tuple[int, int, int]:
        """
        RSI
        """

        def score_func(rsi):
            if rsi < 30: 
                return -1
            elif rsi > 70:
                return 1
            else: 
                return 0

        n1, n2, n3 = 7, 28, 91

        time.sleep(0.5)
        data = self._get_price_to_now(ticker, ['close'])

        rsi_series_n1 = ta.RSI(data['close'].to_numpy(), n1)
        rsi_series_n2 = ta.RSI(data['close'].to_numpy(), n2)
        rsi_series_n3 = ta.RSI(data['close'].to_numpy(), n3)

        rsi_n1 = rsi_series_n1[-1]
        rsi_n2 = rsi_series_n2[-1]
        rsi_n3 = rsi_series_n3[-1]

        rsi_n1_score = score_func(rsi_n1)
        rsi_n2_score = score_func(rsi_n2)
        rsi_n3_score = score_func(rsi_n3)
    
        return [rsi_n1_score, rsi_n2_score, rsi_n3_score]
    

    def get_MACD_score(self, ticker:str) -> Tuple[int, int, int]:
        """
        MACD 
        """
        def score_func(macd):
            if macd > 0: 
                return 1
            else:
                return -1

        n1, n2, n3 = self.PERIODS

        time.sleep(0.5)
        data = self._get_price_to_now(ticker, ['close'])

        macd_series_n1, _, _ = ta.MACD(data['close'].to_numpy(), fastperiod=12, slowperiod=26, signalperiod=n1)
        macd_series_n2, _, _ = ta.MACD(data['close'].to_numpy(), fastperiod=12, slowperiod=26, signalperiod=n2)
        macd_series_n3, _, _ = ta.MACD(data['close'].to_numpy(), fastperiod=12, slowperiod=26, signalperiod=n3)
        
        macd_n1 = macd_series_n1[-1]
        macd_n2 = macd_series_n2[-1]
        macd_n3 = macd_series_n3[-1]

        macd_n1_score = score_func(macd_n1)
        macd_n2_score = score_func(macd_n2)
        macd_n3_score = score_func(macd_n3)

        return [macd_n1_score, macd_n2_score, macd_n3_score]


    def get_BBANDS_score(self, ticker:str) -> Tuple[int, int, int]:
        """
        BBANDS 
        """
        def score_func(upper, lower, price):
            if price < lower:
                return -1
            elif price > upper:
                return 1
            else:
                return 0

        n1, n2, n3 = self.PERIODS

        time.sleep(0.5)
        data = self._get_price_to_now(ticker, ['close'])

        upper_band_n1, _, lower_band_n1 = ta.BBANDS(data['close'].to_numpy(), timeperiod=n1, nbdevup=1, nbdevdn=1)
        upper_band_n2, _, lower_band_n2 = ta.BBANDS(data['close'].to_numpy(), timeperiod=n2, nbdevup=1, nbdevdn=1)
        upper_band_n3, _, lower_band_n3 = ta.BBANDS(data['close'].to_numpy(), timeperiod=n3, nbdevup=1, nbdevdn=1)
        
        upper_n1, lower_n1 = upper_band_n1[-1], lower_band_n1[-1]
        upper_n2, lower_n2 = upper_band_n2[-1], lower_band_n2[-1]
        upper_n3, lower_n3 = upper_band_n3[-1], lower_band_n3[-1]

        bband_n1_score = score_func(upper_n1, lower_n1, data['close'][-1])
        bband_n2_score = score_func(upper_n2, lower_n2, data['close'][-1])
        bband_n3_score = score_func(upper_n3, lower_n3, data['close'][-1])

        return [bband_n1_score, bband_n2_score, bband_n3_score]
    
    
    def get_STOCH_score(self, ticker:str) -> Tuple[int, int, int]:
        """
        STOCH
        """

        def score_func(slowk):
            if slowk < 20:
                return -1
            elif slowk > 80:
                return 1
            else:
                return 0

        n1, n2, n3 = self.PERIODS

        time.sleep(0.5)
        data = self._get_price_to_now(ticker, ['high', 'low', 'close'])

        slowk_series_n1, _ = ta.STOCH(data['high'].to_numpy(), data['low'].to_numpy(), data['close'].to_numpy(), 
                                fastk_period=n1, slowk_period=3, slowd_period=3)
        
        slowk_series_n2, _ = ta.STOCH(data['high'].to_numpy(), data['low'].to_numpy(), data['close'].to_numpy(), 
                                fastk_period=n2, slowk_period=3, slowd_period=3)
        
        slowk_series_n3, _ = ta.STOCH(data['high'].to_numpy(), data['low'].to_numpy(), data['close'].to_numpy(), 
                                fastk_period=n3, slowk_period=3, slowd_period=3)
        
        slowk_n1 = slowk_series_n1[-1]
        slowk_n2 = slowk_series_n2[-1]
        slowk_n3 = slowk_series_n3[-1]

        slowk_n1_score = score_func(slowk_n1)
        slowk_n2_score = score_func(slowk_n2)
        slowk_n3_score = score_func(slowk_n3)

        return [slowk_n1_score, slowk_n2_score, slowk_n3_score]


    def get_OBV_score(self, ticker:str) -> Tuple[int, int, int]:
        """
        OBV
        """

        def score_func(obv):
            if obv > 0:
                return 1
            else:
                return -1

        n1, n2, n3 = self.PERIODS

        time.sleep(0.5)
        data = self._get_price_to_now(ticker, ['close', 'volume'])

        obv_series_n1 = ta.OBV(data['close'].to_numpy(), data['volume'].to_numpy())
        obv_series_n2 = ta.OBV(data['close'].to_numpy(), data['volume'].to_numpy())
        obv_series_n3 = ta.OBV(data['close'].to_numpy(), data['volume'].to_numpy())

        obv_n1 = obv_series_n1[-1]
        obv_n2 = obv_series_n2[-1]
        obv_n3 = obv_series_n3[-1]

        obv_n1_score = score_func(obv_n1)
        obv_n2_score = score_func(obv_n2)
        obv_n3_score = score_func(obv_n3)

        return [obv_n1_score, obv_n2_score, obv_n3_score]


    def get_ohlcv_n(self, ticker:str, n=100) -> pd.DataFrame:
        """ 
        현재 시점으로부터 과거 n개의 ohlcv 데이터를 리턴
        """

        date_ago = datetime.now() - timedelta(days = n + 1)
        historical_info = self.get_ohlcv_historical(ticker, start=date_ago)
        dataframe = pd.DataFrame([info['quote']['USD'] for info in historical_info])
        dataframe = dataframe.set_index('timestamp', drop=True)
        return dataframe
    

    def _get_price_to_now(self, ticker:str, types:list = ['close']) -> pd.DataFrame:
        """
        과거 데이터 + 현재 시점까지의 가격 및 거래량 데이터를 리턴
        """

        # 전일까지의 price dataframe
        historical_df = self.historical_data_dict[ticker]
    
        try:
            # 현재시점의 price dataframe
            realtime_df = self.get_ohlcv_realtime([ticker])
            realtime_df = pd.DataFrame([realtime_df[ticker][0]['quote']['USD']])
            realtime_df.set_index('last_updated', inplace=True)
            # price series  
            price = pd.concat([historical_df[types], realtime_df[types]])
        
        except:
            price = historical_df[types] 

        return price
    
    def _get_normalize(self, array:np.array, upper:int=5, lower:int=-5):
        """
        Min-Max normalize으로 upper, lower bounding
        """
        k = upper - lower
        d = -lower

        new_array = array.copy()
        max_score = np.max(new_array)
        min_score = np.min(new_array)

        if len(array) == 1:
            return new_array
        
        if max_score == min_score:
            return new_array
    
        new_array = k * (new_array - min_score) / (max_score - min_score) - d
        return new_array

In [27]:
indicator = Indicator(CMC_API_KEY)

In [28]:
tickers = indicator.tickers[:50]

In [29]:
indicator.update_historical_data_frame(tickers)

100%|██████████| 50/50 [01:29<00:00,  1.80s/it]


In [30]:
len(indicator.historical_data_dict.keys())

50

In [31]:
score_dict = indicator.get_total_score(tickers)
score_dict 

 38%|███▊      | 19/50 [01:23<02:07,  4.11s/it]