In [1]:
from collections import defaultdict
from datetime import datetime, timedelta
import os
from copy import deepcopy
import yaml

from clickhouse_driver import Client


from google_sheet import write_df, write_update_time, write_data_to_gs_no_resize
from tqdm.notebook import tqdm

import pandas as pd
import numpy as np
import pygsheets
import scipy
from scipy import stats


from datetime import datetime, timedelta
import time

import logging

In [2]:
def read_yaml_config(file_name: str) -> any:
    with open(file_name) as f:
        return yaml.safe_load(f)

def get_config(
        path_dir_main_config: str,
        path_sensitive_config: str
):
    path_main_config = os.path.join(path_dir_main_config, 'config.yaml')
    path_input_csv = os.path.join(path_dir_main_config, 'input.csv')
    
    with open(path_main_config,'r') as stream:
        main_config = yaml.safe_load(stream)

    with open(path_sensitive_config,'r') as stream:
        conf = yaml.safe_load(stream)
        cli_creds = conf['clickhouse']
        cg_api_key = conf['cg-api-key']
    
    input_p = pd.read_csv(path_input_csv)
    input_p = input_p.set_index('coingecko_id')
    return input_p, main_config, cli_creds, cg_api_key


def setup_logging():
    file_path = os.path.dirname(__file__)
    logging.basicConfig(
        filename=os.path.join(file_path, "main.log"),
        filemode='a',
        level=logging.INFO,
        format='%(asctime)s %(levelname)-8s %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    
def get_enter_prices(input_c):
    buy_prices_usdt = input_p['price_USDT'].to_dict()
    buy_prices_btc = input_p['price_BTC'].to_dict()
    buy_prices_eth = input_p['price_ETH'].to_dict()
    return buy_prices_usdt, buy_prices_btc, buy_prices_eth

def get_info(input_c):
    info = input_p[['ticker', 'price_USDT', 'price_BTC', 'price_ETH', 'amount_usdt', 'dt']] #.set_index('ticker')
    return info

In [3]:
input_p, main_config, cli_creds, cg_api_key = get_config('configs/config_5_25_03_2023/', 'configs/sensitive.yaml')

cli = Client(
    host=main_config['clickhouse']['client']['host'],
    user=cli_creds['user'],
    password=cli_creds['password']
        #settings=main_config['clickhouse']['client']['settings']
    )

cli.execute('select now();')


[(datetime.datetime(2023, 4, 12, 8, 42, 35),)]

In [4]:
def _dt_minus_days(dt: str, days: int):
    return (datetime.strptime(dt, '%Y-%m-%d') - timedelta(days=days)).strftime('%Y-%m-%d')

class DataInterface:
    def __init__(self, cli, conf, dt_close, cli_external=None):
        self.cli = cli
        self.cli_external = cli_external
        self.conf = conf
        self.dt_close = dt_close

    def get_metric_dt_candle_close(self, metric):
        '''
        return pd.DataFrame
        index - cg_id
        columns = [{metric}] with value of aim metric
        
        '''
        table = self.conf.get_table_by_metric(metric)
        q_get = """
            SELECT coingecko_id, value
            FROM {t} 
            WHERE 
                (toDate(dt_candle_close) = '{dt}')
                AND 
                (metric = '{metric}')
        """.format(
            t=table,
            dt=self.dt_close,
            metric=metric
        
        )

        if self.conf.is_metric_private(metric):
            cli = self.cli
        else:
            cli = self.cli_externall
        df_metric = cli.query_dataframe(q_get)
        msk_dup = df_metric.duplicated(['coingecko_id'])
        if sum(msk_dup) > 0:
            dup_cg_id = df_metric.loc[msk_dup, 'coingecko_id'].values.tolist()
            logging.warning(f'There are duplicated cg_id {dup_cg_id} for {metric}')
        
        df_metric = df_metric.drop_duplicates(subset=['coingecko_id'])
        df_metric = df_metric.rename(columns={'value': metric})
        return df_metric.set_index('coingecko_id')
    
    def get_input(self):
        t = self.conf.get_input_table()
        input_ = self.cli.query_dataframe(
            """
            SELECT * FROM {}
            ORDER BY coin_number
            """.format(t)
        )
        return input_.drop(columns=['coin_number'])

    def get_info(self):
        return self.get_input().set_index('coingecko_id')

    def _wrt_base(self, prices_, coins, base_coin):
        pc_prices = prices_[prices_['coingecko_id'].isin(coins)]
        if base_coin == 'usdt':
            return pc_prices[['coingecko_id', 'dt_candle_close', 'price']].rename(columns={'price': 'price_wrt_usdt'})
    
        bc_prices = prices_[prices_['coingecko_id'] == base_coin]
        bc_prices = (
            bc_prices
            .rename(columns={'price' : f'price_{base_coin}'})
            .drop(columns=['coingecko_id'])
        )
        
        pc_wrt_bc = pc_prices.merge(bc_prices, on=['dt_candle_close'])
        pc_wrt_bc[f'price_wrt_{base_coin}'] = pc_wrt_bc['price'] / pc_wrt_bc[f'price_{base_coin}']
        return pc_wrt_bc[['coingecko_id', 'dt_candle_close', f'price_wrt_{base_coin}']]

    def get_coins_by_day(self):
        info = self.get_info()
        coins_today = info[info['dt'].astype(str) < self.dt_close].index.values.tolist()
        return coins_today

    def put_total_metric(self, metric, value):
        t_total = self.conf.get_total_table()
        q = "INSERT INTO {t} VALUES".format(t=t_total)
        values_dict = {
            'metric': metric,
            'value': value,
            'dt_candle_close': datetime.strptime(self.dt_close, '%Y-%m-%d'),
            'tech_load_ts': datetime.today()
        }
        num_written = self.cli.execute(q, [values_dict])
        logging.info(f'Written to {t_total} {num_written} rows')
    
    def _query_get_prices_dt_range(self, needed_coins, days):    
        first_dt = _dt_minus_days(self.dt_close, days)
        q = '''
            SELECT 
            coingecko_id, price, toString(date_of_candle_close) as dt_candle_close
            FROM {table_prices}
            WHERE 
                (coingecko_id in {lst_coins})  
                AND
                (dt_candle_close <= '{dt_close}')
                AND
                (dt_candle_close > '{first_dt}')
            '''.format(
                table_prices=self.conf.get_prices_daily_table(),
                lst_coins = needed_coins,
                dt_close=self.dt_close,
                first_dt=first_dt
            )
        logging.debug(q)
        return q

    def _get_prices_dt_close(self, coins, base_coins):
        base_coins = self.conf.get_crypto_base_coins()
        q_prices_dt_close =  self._query_get_prices_dt_range(coins+base_coins, 1)
        prices_ = self.cli_external.query_dataframe(q_prices_dt_close).drop_duplicates(subset=['coingecko_id', 'dt_candle_close'])
        return prices_

    def get_prices_wrt_base_dt_close(self, coins, base_coin):
        prices_ = self._get_prices_dt_close(coins, [base_coin])
        return self._wrt_base(prices_, coins, base_coin)

    def get_prices_range(self, coins: list, base_coins: list, days: int):
        base_coin_lst = self.conf.filter_crypto_base_coins(base_coins)
        q_prices_range =  self._query_get_prices_dt_range(coins+base_coin_lst, days)
        prices_ = self.cli_external.query_dataframe(q_prices_range).drop_duplicates(subset=['coingecko_id', 'dt_candle_close'])
        return prices_

    def get_prices_wrt_base_range(self, coins, base_coin, days: int, pivoted: bool):
        #Not used
        prices_ = self._get_prices_range(coins, [base_coin], days)
        wrt_base = self._wrt_base(prices_, coins, base_coin)
        if pivoted:
            wrt_base = pd.pivot(wrt_base, index='dt_candle_close', columns='coingecko_id', values=f'price_wrt_{base_coin}')
        return wrt_base

    def put_metric_dt_candle_close(self, metric: str, row_value: pd.Series):
        # put computed metric for each coin
        # row_value: index - cg_id, value - metric
        
        df_metric = row_value.to_frame()
        df_metric = df_metric.reset_index()
        df_metric.columns = ['coingecko_id', 'value']
        
        df_metric['value'] = df_metric['value'].astype(float)
        df_metric['coingecko_id'] = df_metric['coingecko_id'].astype(str)
        
        df_metric['metric'] = metric
        df_metric['dt_candle_close'] = pd.to_datetime(self.dt_close)
        df_metric['tech_load_ts'] = datetime.today()
        
        table_dest = self.conf.get_table_by_metric(metric)
        num_written = self.cli.execute(
            'INSERT INTO {}  VALUES '.format(table_dest),
            df_metric.to_dict('records')
        )
        logging.info(f'Writen {num_written} into {table_dest}')

    def cg2ticker(self, metric_df_pivoted):
        info = self.get_info()
        return metric_df_pivoted.rename(columns=info['ticker'].to_dict())


In [36]:
class ConfigInterface:
    def __init__(self, path_conf):
        self.config = read_yaml_config(path_conf)
        self.sch = self.config['db-tables']['portfolio']['schema']
        self.dict_metric_table = self.get_dict_metric_table()
        self.dict_output_table = self.get_dict_output_table()

        self.path_to_conf_files = os.path.dirname(path_conf)

    def get_number_of_metric_in_total(self, metric):
        for idx, k in enumerate(self.dict_output_table):
            if metric == k:
                break
        else:
            logging.error(f'No such metric {metric} in output total')
        
        return idx + 1 #return from 1

    def port_st(self, t):
        return self.sch_table(self.sch, t)

    def sch_table(self, sch, t):
        return f'{sch}.{t}'

    def get_input_table(self):
        return self.port_st(self.config['db-tables']['portfolio']['input']) 
    
    def get_prices_daily_table(self):
        return self.config['db-tables']['prices']

    def get_total_table(self):
        return self.port_st(self.config['db-tables']['portfolio']['total']) 
    
    def get_table_by_metric(self, metric):
        return self.dict_metric_table[metric]

    def get_all_metrics(self):
        return [k for k in self.get_dict_metric_table().keys()]

    def get_dict_metric_table(self):
        dict_metric_table = self.config['db-tables']['portfolio']['metrics-tables']
        #dict_metric_table = {k:v for k,v in dict_metric_table.items() if (k!='roi') and (k!='beta_corr')}
        dict_metric_table = {k: self.port_st(v) for k,v in dict_metric_table.items()}
        return dict_metric_table

    def get_dict_output_table(self):
        return self.config['output']['dict_metric_output']

    def get_path_js_key(self):
        file_p = self.config['google-sheet']['path-to-key-file']

        formatted = file_p.format(conf_path=self.path_to_conf_files)
        #logging.info(f'BBB {formatted}')
        
        return formatted
    
    def get_crypto_base_coins(self):
        base_coins = self.get_base_coins()
        if 'usdt' in base_coins:
            base_coins = base_coins[:]
            base_coins.remove('usdt')
        return base_coins

    def filter_crypto_base_coins(self, coins) -> list:
        return [coin for coin in coins if coin in self.get_crypto_base_coins()]

    def get_base_coins(self):
        return self.config['base-coins']

    def get_sheet_id(self):
        return self.config['google-sheet']['sheet-id']
    
    def get_first_date(self):
        return self.config['output']['first_date']
    
    def is_metric_private(self, metric):
        if metric not in self.config['db-tables']['public']:
            return True
        return False
    
class ConfigInterfaceDynamic(ConfigInterface):
    def __init__(self, path_conf):
        super().__init__(path_conf)
    
    def get_t_portfolio_coins(self):
        return self.port_st(self.config['db-tables']['portfolio']['portfolio_coins'])
    
    def get_t_unrealized(self):
        return self.port_st(self.config['db-tables']['portfolio']['unrealized'])
    
    def get_t_realized(self):
        return self.port_st(self.config['db-tables']['portfolio']['realized_history'])
    
    def get_t_snapshots(self):
        return self.port_st(self.config['db-tables']['portfolio']['snaphots'])
    
    
    
    

In [71]:
class DataInterfaceDynamic(DataInterface):
    def __init__(self, cli, conf, dt_close, cli_external=None):
        super().__init__(cli, conf, dt_close, cli_external)
        self.prev_dt_close = (
            datetime.strptime(dt_close, '%Y-%m-%d') - timedelta(days=1)
        ).strftime('%Y-%m-%d')
        
    def get_candidates_to_portfolio(self) -> list:
        actual_coins = cli.query_dataframe('select * from DEV_ODS_CROSS_BC_RESOURCES.SELECTED_FOR_DASH_TOKENS_V')
        coins_today = actual_coins['coingecko_id'].values.tolist()
        coins_today.remove('arbitrum')
        return coins_today
    
    def get_portfolio_coins_dt(self) -> list:
        q = '''
            SELECT coins_in_portolio
            FROM {t}
            WHERE dt_candle_close = '{prev_dt_close}'
        '''.format(
            t = self.conf.get_t_portfolio_coins(),
            prev_dt_close=self.prev_dt_close
        )
        
        return self.cli.query_dataframe(q)['coins_in_portolio'].values[0]
    
    def get_inter_coins_today(self):
        coins_ = list(set(self.get_candidates_to_portfolio()) | set(self.get_portfolio_coins_dt()))
        coins_.remove('ethereum')
        coins_.remove('bitcoin')
        return coins_
        
        
        
    def get_btc_sum(self, coins):
        #coins_today = self.get_inter_coins_today()
        
        q = '''
                SELECT coingecko_id, days_above_200EMA, (toDate(tech_load_ts) - 1) as dt_candle_close, tech_load_ts
                FROM DEV_DM_ANALYTCS.EMA200_BTC_SUMMARY
                WHERE 
                    (coingecko_id in {coins_today})
                    AND
                    (dt_candle_close = '{dt_close}')
            '''.format(
                coins_today=coins,
                dt_close = self.dt_close
            )
        #print(q)
        btc_sum = cli.query_dataframe(q)

        return btc_sum
        
    def _smash_df(self, df):
        df = df.drop(columns=['tech_load_ts'])
        return df.sort_values(by='dt_candle_close')
        
    def get_dynaimic_state(self):
        q_realized = '''
            SELECT *
            FROM {t}
            WHERE dt_candle_close < '{dt_close}'
        '''.format(
            t=self.conf.get_t_realized(),
            dt_close=self.dt_close
        )
        
        q_unrealized = '''
            SELECT *
            FROM {t}
            WHERE dt_candle_close == '{prev_dt_close}'
        '''.format(
            t=self.conf.get_t_realized(),
            prev_dt_close=self.prev_dt_close
        )
        
        realized = self.cli.query_dataframe(q_realized)
        unrealized = self.cli.query_dataframe(q_unrealized)
        
        return self._smash_df(realized), self._smash_df(unrealized)
        
        
        
    


In [72]:
dt_close = '2023-04-09'
conf = ConfigInterfaceDynamic('configs/conf_new.yaml')

clii = DataInterfaceDynamic(cli, conf, dt_close, cli_external=cli)

In [73]:
conf.get_t_realized()

'DEV_PORTFOLIO_DYNAMIC_5_2023_03_26.REALIZED_HISTORY'

In [74]:
coins = clii.get_inter_coins_today()

prices_wrt_btc = clii.get_prices_wrt_base_dt_close(coins, 'bitcoin')
prices_wrt_eth = clii.get_prices_wrt_base_dt_close(coins, 'ethereum')
prices_wrt_usd = clii.get_prices_wrt_base_dt_close(coins, 'usdt')

In [75]:
prices = pd.merge(
    pd.merge(
        prices_wrt_btc, price_wrt_eth, on=['coingecko_id','dt_candle_close']
    ), 
    prices_wrt_usd, on=['coingecko_id','dt_candle_close' ]
)

In [76]:
btc_sum_today = clii.get_btc_sum(coins)

In [62]:
def handle_out(dt_out, coins_out, unrealized, realized, prices):
    for coin_out in coins_out:  
        if coin_out == 'optimism':
            print('out', dt_out)
        prices_in = unrealized.loc[(unrealized['coingecko_id'] == coin_out)].squeeze()
    
        dt_in = prices_in.loc['dt_in']
        price_in_usd = prices_in.loc['price_in_usd']
        price_in_btc = prices_in.loc['price_in_btc']
        price_in_eth = prices_in.loc['price_in_eth']
        usd_in = prices_in.loc['usd_in']
        btc_in = prices_in.loc['btc_in']
        eth_in = prices_in.loc['eth_in']

        prices_out = prices[
            (prices['coingecko_id'] == coin_out) & (prices['dt_candle_close'] == dt_out)
        ].squeeze()
        
        

        price_out_usd = prices_out.loc['price_wrt_usd']
        price_out_btc = prices_out.loc['price_wrt_bitcoin']
        price_out_eth = prices_out.loc['price_wrt_ethereum']
        usd_out = usd_in / price_in_usd * price_out_usd
        btc_out = btc_in / price_in_btc * price_out_btc
        eth_out = eth_in / price_in_eth * price_out_eth
        
        fixed_usd = usd_out - usd_in
        fixed_btc = btc_out - btc_in
        fixed_eth = eth_out - eth_in
        
        

        realized.loc['{} {} {}'.format(coin_out, dt_in, dt_out)] = {
            'coingecko_id': coin_out,
            'dt_in': dt_in,
            'dt_out': dt_out,
            'price_in_usd': price_in_usd,
            'price_in_btc': price_in_btc,
            'price_in_eth': price_in_eth,
            'usd_in': usd_in,
            'btc_in': btc_in,
            'eth_in': eth_in,
    
            'price_out_usd': price_out_usd,
            'price_out_btc': price_out_btc,
            'price_out_eth': price_out_eth,
            'usd_out': usd_out,
            'btc_out': btc_out,
            'eth_out': eth_out,
            
            'fixed_usd': fixed_usd,
            'fixed_btc': fixed_btc,
            'fixed_eth': fixed_eth,
        }
        
        unrealized = unrealized.drop('{} {}'.format(coin_out, dt_in))
    
    return unrealized, realized

def handle_in(dt_calc, coins_in, unrealized, prices, amount=10_000):
    dt_in = dt_calc
    for coin_in in coins_in:
        if coin_in == 'optimism':
            print('in', dt_in)
        
        
        prices_in = prices[
                (prices['coingecko_id'] == coin_in) & (prices['dt_candle_close'] == dt_in)
            ].squeeze()

        unrealized.loc['{} {}'.format(coin_in, dt_in)] = {
            'coingecko_id': coin_in,
            'dt_in': dt_in, 
            'price_in_usd': prices_in.loc['price_wrt_usd'], 
            'price_in_btc': prices_in.loc['price_wrt_bitcoin'], 
            'price_in_eth': prices_in.loc['price_wrt_ethereum'], 
            
            'usd_in': amount,
            'btc_in': amount,
            'eth_in': amount
        }
        
    return unrealized

def make_snapshot(dt_calc, unrealized, prices):
    pass

def step_day(dt_calc, coins_this_day, portfolio_members, unrealized, realized, prices):
    
    coins_this_day = dict_above_ema[dt_calc]
    prev_coins = portfolio_members.iloc[-1]['coins']

    coins_out = [coin for coin in prev_coins if coin not in coins_this_day]
    coins_in =  [coin for coin in coins_this_day if coin not in prev_coins]
    
    unrealized, realized = handle_out(dt_calc, coins_out, unrealized, realized, prices)
    
    snapshot = pd.DataFrame(columns=realized.columns)
    unrealized_before = unrealized[unrealized['dt_in'] < dt_calc]
    coins = unrealized_before['coingecko_id'].values.tolist()
    _, snapshot = handle_out(dt_calc, coins, unrealized_before, snapshot, prices)

    unrealized = handle_in(dt_calc, coins_in, unrealized, prices)
    portfolio_members.loc[dt_calc] = [dt_calc, coins_this_day]
    
    
    
    return portfolio_members, unrealized, realized, snapshot

In [63]:
realized, portfolio_members, unrealized = clii.get_dynaimic_state()

In [None]:
portfolio_members, unrealized, realized, snapshot = (
        step_day(dt_calc, dict_above_ema[dt_calc], portfolio_members, unrealized, realized, prices)
    )