In [1]:
import pandas as pd
import numpy as np
import pickle
import os
import asyncio
import datetime
from datetime import datetime
from datetime import timedelta, timezone
from typing import Optional
from dotenv import load_dotenv
from operator import itemgetter

load_dotenv()
PWD = os.getenv("PWD")
db_name = PWD + "\\database" + "\\RVNUSDT.db"
import sys

sys.path.insert(1, PWD + "\\modules")
sys.path.insert(1, PWD )
from alg_modules.alg_handler import AlgHandler
from plot_modules.candle_plot import CandlePlot
from collections import deque
from paper_trade import PaperTrader
import time
import logging

DEBUG = __debug__
LOG_FILE_NAME = "log_file_name.log"
format = "%(asctime)s [%(levelname)s]: %(message)s"
logger = logging.basicConfig(
    filename=LOG_FILE_NAME if not DEBUG else None,
    format=format,
    encoding="utf-8",
    level=logging.INFO,
)
if not DEBUG:
    logging.getLogger(logger).addHandler(logging.StreamHandler())

from stop_loss import StopLoss
from trade_strategy import TradeStrategy
from wss_thread import WssThread
from api_modules.open_binance_api import OpenBinanceApi
import pytz
tzdata = pytz.timezone('Europe/Moscow') 
from utils.time_utils import sleep_until

from data_API.data_API_wrapper import DataApiWrapper

In [2]:
server_time = datetime.fromtimestamp(OpenBinanceApi.server_time()/1000)
local_time = datetime.now()
delay = server_time - local_time
logging.info(f'server_time: {server_time}')
# logging.info(f'local_time: {local_time}')
delay.total_seconds()

2022-03-27 02:19:49,243 [INFO]: server_time: 2022-03-27 02:19:51.621000


2.377899

In [3]:
# notifications
from win10toast import ToastNotifier
toast = ToastNotifier()
static_notification_settings = dict(
    title="Algo traid BOT",
    duration = 20,
    icon_path = "python.ico",
    threaded = 1,
)

notify = lambda msg: toast.show_toast(
    msg=msg,
    **static_notification_settings,
    )

async def notification(msg):
    if notify(msg):
        pass
    else:
        await asyncio.sleep(20)
        notify(msg)
msg="Watch out for notifications from here"

await notification(msg)

In [4]:
# data API connection
endpoint = 'http://192.168.1.125:8000'
data_API = DataApiWrapper(
    endpoint=endpoint
)

In [5]:
DATA_AWAIT_TIME = 30 # seconds
SERVER_DELAY = 10 # seconds
# INTERVAL_SECONDS = 60 # seconds
INTERVAL_SECONDS = 60*5 # seconds
w = WssThread(
    url='wss://stream.binance.com:9443/ws/rvnusdt@ticker',
    maxlen=10,
    )
w.start()
# wait for wss to open
time.sleep(0.3 )
STOP_LOSS_ENABLED=True
STOP_LOSS_THRESHOLD=-1.3

DEQUE_MAX_LENGTH = 200
# INTERVAL = '1m'
INTERVAL = '5m'


df = OpenBinanceApi.get_df(
    pair = 'RVNUSDT',
    interval = INTERVAL,
    limit = 1000,
)
# drop last row TODO make assert to not dublicate last row from cycle
df = df[:-2]
stop_loss_trade_flag = False
MA_list = (1, 7, 25, 100)

window = deque(maxlen=200)
for i, row in df.iterrows():
    window.append(dict(row.squeeze()))
#initial currency resources
p_trdr = PaperTrader(
    main_currency_label='RVN',
    secondary_currency_label='USD',
    main_currency_amount=100,
    secondary_currency_amount=0.01,
    fee=0.1,
)
trade_data = pd.DataFrame(
    columns = p_trdr.get_df(timestamp=df.iloc[-1]['Date']).columns.values
)
trade_data['reason'] = ''
# with open(f'{PWD}/temp/trade_data_raw.csv', 'w')as f:
#     f.write(trade_data.to_csv())
# with open(f'{PWD}/temp/df_raw.csv', 'w')as f:
#     f.write(df.to_csv())
stop_loss = StopLoss(
    STOP_LOSS_THRESHOLD=STOP_LOSS_THRESHOLD,
)
# init alg
alg = AlgHandler(
    df=pd.DataFrame([]),
    MA_list=MA_list,
    )
data_API.update_ma_lines(MA_list)
data_API.update_stock_data(df)
data_API.update_trade_data(trade_data)
stop_loss_trade_flag = 0
try:
    while 1:
        logging.info('===get new data===')
        new_df = OpenBinanceApi.get_df(
                pair = 'RVNUSDT',
                interval = INTERVAL,
                limit = 2, # mb we need 2 here
            )
        dt = datetime.fromtimestamp(int(new_df.Real_Date[-1:])/1000)
        server_time = datetime.fromtimestamp(OpenBinanceApi.server_time()/1000)
        logging.debug(f'server time: {server_time}   {server_time.minute=}, {dt.minute=}')
        # extract function?
        
        # works if delay between server and client within 1 minute range
        if server_time.minute == dt.minute:
            logging.debug('+++===success===+++')
            # add full candle to sliding window; cant be sure that last candle is full
            window.append(dict(new_df[-2:-1].squeeze()))
            time_cursor = pd.to_datetime(new_df[-2:-1].Date[0])
            # logging.warning(f'{time_cursor=}')
            df_ = pd.DataFrame(window)
            data_API.update_stock_data(df_)

            # === process data here ===

            stop_loss_count, trade_data, p_trdr, stop_loss_trade_flag = TradeStrategy.trade_strategy(
                STOP_LOSS_ENABLED=STOP_LOSS_ENABLED, 
                STOP_LOSS_THRESHOLD=STOP_LOSS_THRESHOLD, 
                trade_data=trade_data,
                stop_loss=stop_loss,
                p_trdr=p_trdr,
                window=window,
                alg=alg,
                stop_loss_trade_flag=stop_loss_trade_flag,
                # df=df_,
                wss=w,
                # add columns notation Data/datate_created; high_/High; ...
            )

            # === end of data processing ===

            data_API.update_trade_data(trade_data)
            
            time_to_sleep = dt - delay + timedelta(seconds=SERVER_DELAY) + timedelta(seconds=INTERVAL_SECONDS)
            server_delay = dt - server_time
            logging.info(f'server valid time: {server_time}')
            logging.info(f'server delay: {server_delay.total_seconds()}')
            logging.info(f'sleep till: {time_to_sleep}')
            await sleep_until(time_to_sleep)
        else:
            logging.info('---not valid---')
            logging.debug(f'sleep {DATA_AWAIT_TIME} sec')
            await asyncio.sleep(DATA_AWAIT_TIME)
        
finally:
    # close thread
    w.close()
    # trade.close()
    # notify that we done
    await notification('App terminated')


2022-03-27 02:19:50,951 [INFO]: ### Opening wss stream ###
2022-03-27 02:19:51,016 [INFO]: ===get new data===
2022-03-27 02:19:51,803 [INFO]: ---not valid---


In [None]:
assert 0

In [None]:
# 25 100


In [None]:
trade_data

In [None]:
for i, row in df.iterrows():
    window.append(dict(row.squeeze()))

In [None]:
pd.DataFrame(window)

In [None]:
df = df[:-1]
df

In [None]:
def process_data(new_df: pd.DataFrame, window: deque,) -> pd.DataFrame:
    window.append(new_df)
    df_ = pd.DataFrame(window)
    return df_

In [None]:
dict(new_df.squeeze())

In [None]:
window[0]

In [None]:
trade_data