In [None]:
# this code's main loop restart the user stream socket if it is dead

In [1]:
import logging, asyncio, json, threading, time, os, sys, requests, websocket

import mplfinance as mpl
logging.basicConfig(filename='bot.log',
					level=logging.INFO,
					format='%(asctime)s %(message)s')

import telegram
from telegram.ext import Updater, CallbackContext, MessageHandler, Filters
from telegram.ext import CommandHandler, CallbackQueryHandler
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pprint import pprint
import matplotlib.pyplot as plt
from termcolor import colored, cprint

# from binance.client import Client
from binance import Client, ThreadedWebsocketManager, ThreadedDepthCacheManager
from binance import BinanceSocketManager, AsyncClient
from binance.helpers import date_to_milliseconds
from binance.enums import *
from binance.exceptions import BinanceAPIException, BinanceOrderException
# from binance.websockets import BinanceSocketManager

from finrl.preprocessing.preprocessors import FeatureEngineer


In [2]:

# globals
###########################
symbol = 'BTCBUSD'
interval = '1h'
risk, reward = 0.002, 0.003 # have no effect in this code
slow_ema_period = 21*4 # for 4h candles, change for other intervals
fast_ema_period = 7*4
max_reconnects = 20
telegramApiKey = 'your_telegram_bot_api_key'
pathToAPIkeys = '' # if the path is not this directory, END with a slash
###########################

# declarations
###########################
price_base = 0
base_amount, quote_amount = 0, 0
base_currency, quote_currency = 0, 0
df = pd.DataFrame()
signal = 'hold'
state = 'None'
action = 'None'
lb, ub = np.nan, np.nan
MIN_NOTIONAL, stepSize, precision = 0, 0, 0
api_key, api_secret = 0, 0
client, bsm, conn_key, updater = 0, 0, 0, 0
old_time = 0
valid_base_quantity = 0
next_time = 0
n_minutes = 0
fe = None
prof_btc_pc, prof_usdt_pc = 0, 0
init_base, init_quote = 0, 0
cum_commission = 0
twm = 0
entry_price = 0
lookback_s = 0
init_lookback_s = 0
api_key, api_secret = 0, 0
thread = 0
Msg = 0
stopMain = False
user_stream_socket = None
###########################


In [3]:

class MyBinanceSocket(threading.Thread):
	

	def __init__(self, klines=False):
		global pathToAPIkeys
		threading.Thread.__init__(self)
		self.klines = klines
		with open(f'{pathToAPIkeys}loadEnv.sh', 'r') as f:
			lines = f.readlines()
		for line in lines:
			for i in line.split('='):
				if i.endswith('binance_api'):
					self.api_key = line.split('=')[-1][:-1]
					break
				if i.endswith('binance_secret'):
					self.api_secret = line.split('=')[-1][:-1]
					break
					
		self.base_api = 'https://api.binance.com'
		self.ext = '/api/v3/userDataStream'
		self.headers = {'X-MBX-APIKEY': self.api_key}
		self.daemon = True
		self.create_socket()
	
	def create_socket(self):
		global interval, symbol
		reportAndLog('creating a new socket...')
		r = requests.post(f'{self.base_api}{self.ext}', headers=self.headers)
		self.listen_key = json.loads(r.text)['listenKey']
		if not self.klines:
			self.sock_address = f"wss://stream.binance.com:9443/ws/{self.listen_key}"
		elif self.klines:
			self.sock_address = f"wss://stream.binance.com:9443/ws/{(symbol.lower())}@kline_{interval}"
		else:
			print('Wrong socket address parameter')
			raise
		self.ws = websocket.WebSocketApp(self.sock_address, 
							on_message=self.on_message, 
							on_close=self.on_close, 
							on_ping=self.on_ping,
							on_pong=self.on_pong,
							on_error=self.on_error)
		reportAndLog('success ...')

	def run(self):
		self.ws.run_forever(ping_interval=600, ping_payload=self.listen_key)
	
	def stop(self):
		self.ws.close()
		raise

	def on_message(self, ws, msg):
		global Msg, cum_commission, symbol
		logging.info(msg)
		msg_ = json.loads(msg)
		if (not self.klines) & (msg_['s'] == symbol) & (msg_['e'] == 'executionReport'):
			Msg = msg_
			report(f"ID: {msg_['i']}\n{msg_['X']}\nSymbol: {msg_['s']}\nSide: {msg_['S']}\nType: {msg_['o']}\nQty: {msg_['q']}\nPrice: {msg_['p']}\nCum_qty: {msg_['z']}\nCommission: {msg_['n']}")
			cum_commission += float(msg_['n'])
		if self.klines:
			if 'k' in msg_.keys():
				if msg_['k']['x'] == True:
					print(datetime.fromtimestamp(int(msg_['k']['t'])/1000))
					print(msg_)
		logging.info(msg_)

	def on_close(self, ws, close_status_code, close_msg):
		print('closed')
		print("on_close args:")
		if close_status_code or close_msg:
			report("close status code: " + str(close_status_code))
			report("close message: " + str(close_msg))
		logging.info('socket closed')

		# if socket is closed, re-create and restart it
		# time.sleep(10)
		# self.create_socket()
		# self.run()

	def on_error(self, wsapp, message):
		logging.info('error in socket')

	def on_ping(self, wsapp, message):
		# on ping from server, we send it a post request with the header to keep the listenKey alive
		print(f"Got a ping! Klines: {self.klines}")
		self.send_put()
		logging.info(f"Got a ping! Klines: {self.klines}")

	def on_pong(self, wsapp, message):
		# on pong from server we don't do anything (but can)
		print("Got a pong! No need to respond")
		logging.info('got a pong')
		
	def send_put(self):
		r = requests.post(f'{self.base_api}{self.ext}', headers=self.headers)
		self.listen_key = json.loads(r.text)['listenKey']
		logging.info(r.text)

def heikin_ashi(df):
	heikin_ashi_df = pd.DataFrame(index=df.index.values, columns=['open', 'high', 'low', 'close'])
	heikin_ashi_df['close'] = (df['open'] + df['high'] + df['low'] + df['close']) / 4
	
	for i in range(len(df)):
		if i == 0:
			heikin_ashi_df.iat[0, 0] = df['open'].iloc[0]
		else:
			heikin_ashi_df.iat[i, 0] = (heikin_ashi_df.iat[i-1, 0] + heikin_ashi_df.iat[i-1, 3]) / 2
	heikin_ashi_df['high'] = heikin_ashi_df.loc[:, ['open', 'close']].join(df['high']).max(axis=1)
	heikin_ashi_df['low'] = heikin_ashi_df.loc[:, ['open', 'close']].join(df['low']).min(axis=1)
	heikin_ashi_df['date'] = pd.DatetimeIndex(df.date)
	heikin_ashi_df['date'] = df.date.apply(lambda x: pd.Timestamp(x))
	return heikin_ashi_df.fillna(0)

def add_features():
	global fe, df
	tmp = fe.preprocess_data(df[['date', 'open', 'high', 'low', 'close', 'volume', 'tic']])
	for col in tmp.columns:
		df[col] = tmp[col]
	logging.info('features added')

def report(msg):
	global updater
	updater.bot.send_message(chat_id=348368436, text=msg)

def exit_long():
	global state, action
	global price_base
	global entry_price, prof_usdt_pc
	init_sell_sequence(0.5)
	state = 'None'
	action = 'exit_long'
	lb, ub = np.nan, np.nan
	prof_usdt_pc = (price_base - entry_price)/price_base
	msg = f'exit long with a {"profit " if prof_usdt_pc > 0 else "loss "} of {prof_usdt_pc*100:.2f} %'
	reportAndLog(msg)
	cprint(msg, 'green', 'on_blue')
	return 0

def exit_short():
	global state, action
	global price_base
	global entry_price, prof_btc_pc
	init_buy_sequence(0.5)
	state = 'None'
	action = 'exit_short'
	lb, ub = np.nan, np.nan
	prof_btc_pc = (entry_price - price_base)/entry_price
	msg = f'exit short with a {"profit " if prof_btc_pc > 0 else "loss "} of {prof_btc_pc*100:.2f} %'
	reportAndLog(msg)
	cprint(msg, 'red', 'on_blue')
	return 0

def go_long():
	global state, action
	global lb, ub, price_base
	global prof_btc_pc, prof_usdt_pc
	global entry_price
	prof_btc_pc, prof_usdt_pc = np.nan, np.nan
	init_buy_sequence(1.0)

	entry_price = np.copy(price_base)

	state = 'in_long'
	action = 'go_long'
	lb, ub = price_base*(1-risk), price_base*(1+reward)
	logging.info(f'go long')
	cprint(f'go long', 'green', attrs=['bold'])
	return 0

def go_short():
	global state, action, lb, ub
	global prof_btc_pc, prof_usdt_pc
	global entry_price
	prof_btc_pc, prof_usdt_pc = np.nan, np.nan
	init_sell_sequence(1.0)
	entry_price = np.copy(price_base)
	state = 'in_short'
	action = 'go_short'
	lb, ub = price_base*(1-reward), price_base*(1+risk)
	logging.info(f'go short')
	cprint(f'go short', 'red', attrs=['bold'])
	return 0

def hold_position():
	global state, action
	global valid_base_quantity
	global prof_btc_pc, prof_usdt_pc
	prof_btc_pc, prof_usdt_pc = np.nan, np.nan
	valid_base_quantity = np.nan
	print(f'holding {state}')
	logging.info(f'holding {state}')
	action = 'None'
	return 0

def await_entry():
	global state, action
	global valid_base_quantity
	global prof_btc_pc, prof_usdt_pc
	prof_btc_pc, prof_usdt_pc = np.nan, np.nan
	valid_base_quantity = np.nan
	print(f'awaiting entry')
	logging.info(f'awaiting entry')
	action = 'awaiting entry'
	return 0

def rawDataToDataFrame(data):
	df = pd.DataFrame(data)
	df[0] = df[0].map(lambda x: datetime.fromtimestamp(x/1000))
	df.index = pd.to_datetime(df[0])
	df.drop(columns=[0], inplace=True)
	df.drop(columns=[i for i in range(6, df.shape[1]+1)], inplace=True)
	df.columns = ['Open', 
				  'High',
				  'Low',
				  'Close',
				  'Volume']
	df['Open'] = df.Open.apply(pd.to_numeric)
	df['Close'] = df.Close.apply(pd.to_numeric)
	df['Low'] = df.Low.apply(pd.to_numeric)
	df['High'] = df.High.apply(pd.to_numeric)
	df['Volume'] = df.Volume.apply(pd.to_numeric)
	df['Volume'] *= df.Volume.apply(lambda x: x * 1000)
	if df.shape[0] == 1:
		return df.copy()
	if df.shape[0] > 1:
		return df.iloc[:-1].copy()
	else:
		reportAndLog('no data on refresh_data/rawDataToDataFrame. Exception.')
		raise

def wait():
	global interval, next_time, n_minutes
	m = f'Waiting for {interval} ({n_minutes} minutes) until the next refresh @ {str(pd.to_datetime(next_time))}'
	print(m)
	logging.info(m)
	while datetime.now() < next_time:
		time.sleep(0.2)
	now = datetime.now()
	next_time = now - (now - datetime.min) % timedelta(minutes=n_minutes) + timedelta(minutes=n_minutes, seconds=8)
	print(f'NOW: {str(pd.to_datetime(now))}, NEXT TIME: {str(pd.to_datetime(next_time))}')

def refresh_data(seconds_back=None):
	global client, lookback_s, symbol, interval, max_reconnects, df
	
	if seconds_back is not None:
		look_back_s = seconds_back
	else:
		look_back_s = lookback_s
	
	httpstatus, reconnects = -1, 0
	while httpstatus == -1:
		try:
			start_str = int(1000*(time.time() - look_back_s)) # must pass integer MILLISECONDS!!!
			data = client.get_historical_klines(symbol, interval, start_str, limit=1000)
			httpstatus = 'OK'
		except:
			time.sleep(5)
			reconnect()
			reconnects += 1
			if reconnects > max_reconnects:
				logging.info(f'Could not reconnect after {max_reconnects} attempts. Exiting.')
				raise
			
	fresh_data = rawDataToDataFrame(data)
	fresh_data['tic'] = symbol
	fresh_data.reset_index(inplace=True)
	fresh_data.columns = ['date', 'open', 'high', 'low', 'close', 'volume', 'tic']
	fresh_data.where(fresh_data.notnull(), None)
	df = df.append(fresh_data).drop_duplicates(subset=['date'], keep='first').reset_index(drop=True)
	add_features()

def decide_act():
	global state, signal, price_base, lb, ub
	cprint(f'{state}: {lb} {price_base} {ub}', 'magenta', attrs=['bold'])
	if (state == 'in_long') & ((signal == 'exit_long') | (signal == 'abort_long')):
	 	exit_long()
	elif (state == 'in_short') & ((signal == 'exit_short') | (signal == 'abort_short')):
	 	exit_short()
	elif (state == 'None') & (signal == 'buy'):
	 	go_long()
	elif (state == 'None') & (signal == 'sell'):
	 	go_short()
	elif ((state == 'in_long') | (state == 'in_short')) & (signal == 'hold'):
		hold_position()
	elif (state == 'None') & ((signal == 'hold') | (signal == 'await_entry')):
		await_entry()
	else:
		logging.info(f'Signal: {signal} state: {state} action: {action} price_base: {price_base}')
		raise OSError('Something wrong in decide_act')
		
def refresh_signal():
	global df, signal, price_base, state
	check_price()
	fast_ema = df.iloc[-1][f'close_{fast_ema_period}_ema']
	slow_ema = df.iloc[-1][f'close_{slow_ema_period}_ema']
	if (state == 'None'):
		if (fast_ema > slow_ema):
			if (price_base > fast_ema):
				signal = 'buy'
			else:
				signal = 'await_entry'
				cprint('fast ema above slow, but price is not, waiting...', 'grey', attrs=['bold'])
				logging.info('fast ema above slow, but price is not, waiting...')
		elif (fast_ema < slow_ema):
			if (price_base < fast_ema):
				signal = 'sell'
			else:
				signal = 'await_entry'
				cprint('fast ema below slow, but price is not, waiting...', 'grey', attrs=['bold'])
				logging.info('fast ema below slow, but price is not, waiting...')
	elif (state == 'in_long') & (price_base < slow_ema):
		signal = 'abort_long'
		msg = 'we are goint to abort the long'
		reportAndLog(msg)
		cprint(msg, 'yellow', attrs=['bold'])
	elif (state == 'in_short') & (price_base > slow_ema):
		signal = 'abort_short'
		msg = 'we are going to abort the short'
		reportAndLog(msg)
		cprint(msg, 'yellow', attrs=['bold'])
	else:
		signal = 'hold'
	logging.info(f'refreshed signal to: {signal}. State: {state}')

def reconnect():
	global client, api_key, api_secret
	try:
		client = Client(api_key, api_secret)
		cprint('reconnected', 'red', 'on_yellow')
		logging.info('reconnected')
	except:
		cprint('reconnecting failed', 'red', 'on_yellow')
		logging.info('reconnecting failed')

def check_open_orders():
	global symbol
	httpstatus, reconnects = -1, 0
	while (httpstatus == -1):
		try:
			OO = client.get_open_orders()
			if len(OO) == 0:
				httpstatus = 'OK'
				return 0
			else:
				for i in OO:
					httpstatus = 'OK'
					if (i['symbol'] == symbol):
						return 1
					else:
						return 0
		except:
			time.sleep(5)
			reconnect()
			reconnects += 1
			if reconnects > max_reconnects:
				msg = f'Could not check_open_orders after {max_reconnects} attempts. Exiting.'
				print(msg)
				logging.info(msg)
				raise

def check_balance():
	global client, base_currency, quote_currency, base_amount, quote_amount, max_reconnects
	
	httpstatus, reconnects = -1, 0
	while (httpstatus == -1):
		try:
			account = client.get_account()
			httpstatus = 'OK'
		except:
			time.sleep(5)
			reconnect()
			reconnects += 1
			if reconnects > max_reconnects:
				msg = f'Could not check_balance after {max_reconnects} attempts. Exiting.'
				print(msg)
				logging.info(msg)
				raise

	for b in account['balances']:
		if b['asset'] == base_currency:
			base_amount = float(b["free"])
		if b['asset'] == quote_currency:
			quote_amount = float(b["free"])

def check_price():
	global price_base, max_reconnects
	httpstatus, reconnects = -1, 0
	while (httpstatus == -1):
		try:
			price_base = float(client.get_symbol_ticker(symbol=symbol)['price'])
			httpstatus = 'OK'
		except:
			time.sleep(5)
			reconnect()
			reconnects += 1
			if reconnects > max_reconnects:
				logging.info(f'Could not check the price after {max_reconnects} attempts. Exiting.')
				raise

def validate_order(price, quantity):
	global MIN_NOTIONAL, stepSize, precision
	if MIN_NOTIONAL <= price * quantity:
		logging.info('MIN_NOTIONAL passed')
	else:
		logging.info(f'MIN_NOTIONAL FAILED: increase the value of (price x quantity) (now you have {price * quantity}.), Setting quantity to ZERO')
		quantity = 0
	if quantity % stepSize == 0:
		logging.info('stepSize passed')
	else:
		recommended_quantity = np.round(quantity - (quantity % stepSize), precision)
		logging.info(f"stepSize FAILED: Your deal size is not a multiple of stepSize. You could sell {recommended_quantity} of BASE CURRENCY. FIXING the quantity: to {recommended_quantity} of BASE CURRENCY.")
		quantity = recommended_quantity
	return quantity

def sell_base():
	# price of BASE currency in quote currency (e.g. BTCUSDT - BTC (base), USDT (quote))
	global client, symbol, price_base, valid_base_quantity

	httpstatus, reconnects = -1, 0
	while (httpstatus == -1):
		try:
			sell_order_limit = client.create_order(
				symbol=symbol, # we SELL the spicified quantity of BASE currency at the given price for USDT
				side='SELL',
				type='LIMIT',
				timeInForce='GTC',
				quantity=valid_base_quantity,
				price=price_base)
			httpstatus = 'OK'
			print('Sell order placed')
			logging.info('Sell order placed')
			return 0
		except:
			print("Couldn't place sell order. Reconnecting in 5 s")
			logging.info("Couldn't place sell order. Reconnecting in 5 s")
			logging.info(sell_order_limit)
			time.sleep(5)
			reconnect()
			reconnects += 1
			if reconnects > max_reconnects:
				logging.info(f'Could not reconnect after {max_reconnects} attempts. Exiting.')
				raise

def buy_base():
	# price of BASE currency in quote currency (e.g. BTCUSDT - BTC (base), USDT (quote))
	global client, symbol, price_base, valid_base_quantity

	httpstatus, reconnects = -1, 0
	while (httpstatus == -1):
		try:
			buy_order_limit = client.create_order(
				symbol=symbol, # we BUY the spicified quantity of BASE at the given price for USDT
				side='BUY',
				type='LIMIT',
				timeInForce='GTC',
				quantity=valid_base_quantity,
				price=price_base)
			httpstatus = 'OK'
			print('Buy order placed')
			logging.info('Buy order placed')
			return 0
		except:
			print("Couldn't place sell order. Reconnecting in 5 s")
			logging.info("Couldn't place sell order. Reconnecting in 5 s")
			logging.info(buy_order_limit)
			time.sleep(5)
			reconnect()
			reconnects += 1
			if reconnects > max_reconnects:
				logging.info(f'Could not reconnect after {max_reconnects} attempts. Exiting.')
				raise

def init_buy_sequence(param):
	global client, symbol, base_currency, quote_currency
	global base_amount, quote_amount
	global price_base, valid_base_quantity
	global max_reconnects

	if check_open_orders() == 0:
		check_balance() # there's a min amount you can sell, or buy in BTC (base currency)
		check_price()
	 
		base_to_buy = quote_amount/price_base * param
		valid_base_quantity = validate_order(price_base, base_to_buy)
		if valid_base_quantity > 0:
			buy_base()
			msg = f'Ordered to BUY {valid_base_quantity} {base_currency} @ {price_base} {quote_currency}.'
			cprint(msg, 'green', attrs=['bold'])
			reportAndLog(msg)

			# wait until the order is filled
			f = 0
			while not check_open_orders() == 0:
				f += 1
				time.sleep(5)
				print('the buy order is still not filled. Waiting for 5 seconds.')
				logging.info('Buying. the order is still not filled. Waiting for 5 seconds.')
				if f > 100:
					msg = 'Order not filled completely. Moving on.'
					cprint(msg, 'blue', attrs=['bold'])
					logging.info(msg)
					break
					# raise OSError('Not bought within 10 seconds. ABORTING.')
			# refresh balance after the order is fully filled
			check_balance()
			msg = f'Buy order filled. Balance: BASE: {base_amount} QUOTE:{quote_amount}'
			cprint(msg, 'cyan', attrs=['bold'])
			logging.info(msg)
		else:
			logging.info(f'valid_base_quantity is {valid_base_quantity} which is not enough. ABORTING.')
			raise OSError(f'valid_base_quantity is {valid_base_quantity} which is not enough. ABORTING.')
	else:
		logging.info(f'ERROR: There are open orders. ABORTING.')
		raise OSError(f'ERROR: There are open orders. ABORTING.')

def init_sell_sequence(param):
	global client, symbol, base_currency, quote_currency
	global base_amount, quote_amount
	global price_base, valid_base_quantity
	global max_reconnects
	
	if check_open_orders() == 0:
		check_balance() # there's a min amount you can sell, or buy in BTC (base currency)
		check_price()
		
		valid_base_quantity = validate_order(price_base, base_amount * param)
		if valid_base_quantity > 0:
 			
 			# place sell order
			sell_base()
			msg = f'Place order to SELL {valid_base_quantity} {base_currency} @ {price_base} {quote_currency}.'
			cprint(msg, 'red', attrs=['bold'])
			reportAndLog(msg)

			# wait until the order is filled
			f = 0
			while not check_open_orders() == 0:
				f += 1
				time.sleep(5)
				print('the sell order is still not filled. Waiting for 5 seconds.')
				logging.info('selling. the order is still not filled. Waiting for 5 seconds.')
				if f > 100:
					msg = 'Order not filled completely. Moving on.'
					cprint(msg, 'blue', attrs=['bold'])
					logging.info(msg)
					break
					# raise OSError('Not sold within 10 seconds. ABORTING.')
			
			# refresh balance after the order is fully filled
			check_balance()
			msg = f'Sell order filled. Balance: BASE: {base_amount} QUOTE:{quote_amount}'
			cprint(msg, 'cyan', attrs=['bold'])
			logging.info(msg)
		else:
			logging.info(f'valid_base_quantity is {valid_base_quantity} which is not enough. ABORTING.')
			raise OSError(f'valid_base_quantity is {valid_base_quantity} which is not enough. ABORTING.')

	else:
		logging.info(f'Not SELLING anything. There are unfilled orders')
		raise OSError(f'Not SELLING anything. There are unfilled orders')

def log_step():
	global signal, state, action
	global df, lb, ub
	global prof_usdt_pc, prof_btc_pc
	global quote_amount, base_amount, price_base, valid_base_quantity
	lr = df.iloc[-1].name

	print('Last date: ', df.iloc[-1]['date'])
	
	df.at[lr, 'lb'] = lb
	df.at[lr, 'ub'] = ub
	df.at[lr, 'signal'] = signal
	df.at[lr, 'state'] = state
	df.at[lr, 'action'] = action
	if action in ['go_long', 'go_short', 'exit_long', 'exit_short']:
		df.at[lr, 'deal_price'] = price_base
		df.at[lr, 'deal_amount'] = valid_base_quantity
	else:
		df.at[lr, 'deal_price'] = np.nan
		df.at[lr, 'deal_amount'] = np.nan
	df.at[lr, 'base_amount'] = base_amount
	df.at[lr, 'quote_amount'] = quote_amount
	df.at[lr, 'prof_btc_pc'] = prof_btc_pc
	df.at[lr, 'prof_usdt_pc'] = prof_usdt_pc
	df.at[lr, 'cum_commission'] = cum_commission

	hodl_base = init_base * price_base
	hodl_quote = init_quote / price_base
	df.at[lr, 'hodl_base'] = hodl_base
	df.at[lr, 'hodl_quote'] = hodl_quote

	liq_base = base_amount + quote_amount/price_base
	liq_quote = quote_amount  + base_amount * price_base
	df.at[lr, 'liq_base'] = liq_base
	df.at[lr, 'liq_quote'] = liq_quote

	df.to_csv('ss.csv')

def reportPos(heiknAshi):
	global df, fast_ema_period, slow_ema_period, init_lookback_s
	fig, (ax, ax1, ax2, ax3) = plt.subplots(4,1, figsize=(16,7), gridspec_kw={'height_ratios':[7,1,1,1]}, sharex=True)
	tmp = df.copy()
	tmp['date'] = pd.DatetimeIndex(tmp.date)
	tmp['date'] = tmp.date.apply(lambda x: pd.Timestamp(x))

	now = pd.to_datetime("today")
	st = now - pd.DateOffset(seconds=init_lookback_s)
	tmp = tmp[(tmp.date > st) & (tmp.date <= now)].reset_index(drop=True)

	ha = heikin_ashi(tmp)
	if heiknAshi:
		mpl.plot(ha.set_index('date'), type='candle', ax=ax)
	else:
		mpl.plot(tmp.set_index('date'), type='candle', ax=ax)

	tmp.plot(y=[f'close_{fast_ema_period}_ema'], ax=ax, alpha=0.5, use_index=True)
	tmp.plot(y=[f'close_{slow_ema_period}_ema'], ax=ax, alpha=0.5, use_index=True)
	tmp.plot(y=['cci'], ax=ax3, alpha=1, linewidth=0.5, use_index=True)

	# tmp.plot(y=['ub', 'lb'], ax=ax, lw=0.5, legend=None, markersize=4, linestyle='none', marker='o', use_index=True)

	marker = ['X', 'v', 'X', '^']
	alpha = [0.3, 1.0, 0.3, 1.0]
	color = ['green', 'red', 'red', 'green']

	for i, action in enumerate(['exit_long', 'go_short', 'exit_short', 'go_long']):
		tmp[tmp.action==action].plot(y='close',
			ax=ax,
			marker=marker[i],
			markersize=12,
			alpha = alpha[i],
			legend=None,
			color=color[i],
			linestyle='none',
			use_index=True)

	for i, tup in enumerate(tmp.iterrows()):
		act = tup[1]['action']
		if act == 'exit_long':
			prof = tup[1]['prof_usdt_pc']*100.0
		else:
			prof = tup[1]['prof_btc_pc']*100.0
		date = i + 1
		price = tup[1]['close']
		color = 'green' if prof > 0 else 'red'
		if not np.isnan(prof):
			ax.annotate(f'{prof:.2f}%', (date, price), fontsize=10, color=color)

	# tmp.plot(y='ewm', ax=ax2, label='Price deviation from EMA', use_index=True)
	ax2twin = ax2.twinx()

	tmp.plot(y='volume', ax=ax1, use_index=True)
	ax1.axhline(0, lw=0.5)
	ax2.axhline(0, lw=0.5)
	ax.grid('on')

	fig.subplots_adjust(hspace=0)
	plt.savefig('latest_deals.png', dpi=300)
	plt.close()

def getStatus(update, context):
    global df
    dic_ = df.iloc[-1].to_dict()
    str_ = ''
    for key in dic_.keys():
        str_ += f'{key}:\t{dic_[key]}\n'
    report(str_)

def reportPerf():
	global df, slow_ema_period, fast_ema_period, symbol
	res = df.copy()

	slow_ema = f'close_{slow_ema_period}_ema'
	fast_ema = f'close_{fast_ema_period}_ema'

	fig, ax = plt.subplots(3,1, figsize=(17,10), gridspec_kw={'height_ratios':[7,7,7]})

	marker = ['X', 'v', 'X', '^']
	alpha = [0.3, 1.0, 0.3, 1.0]
	color = ['green', 'red', 'red', 'green']
	for i, action in enumerate(['exit_long', 'go_short', 'exit_short', 'go_long']):
		res[res.action==action].plot(y='close',
			ax=ax[0],
			marker=marker[i],
			markersize=8,
			alpha = alpha[i],
			legend=None,
			color=color[i],
			linestyle='none',
			use_index=True)
	res.plot(y=['close', slow_ema, fast_ema], ax=ax[0], alpha=0.5, use_index=True)
	axt = ax[1].twinx()
	res.plot(x='date', y=['liq_quote', 'hodl_base'], color='blue', style=['-', ':'], ax=ax[1])
	res.plot(x='date', y=['liq_base', 'hodl_quote'], color='red', style=['-', ':'], ax=axt)
	ax[0].grid('on')
	ax[1].grid('on')

	fvi = res.liq_quote.first_valid_index()
	(res.liq_quote/res.iloc[fvi].liq_quote - 1).plot(ax=ax[2], color='blue', style=['-'], label='if you cash out in quote, trade')
	(res.hodl_quote/res.iloc[fvi].hodl_quote - 1).plot(ax=ax[2], color='blue', alpha=0.5, label='if you cash out in quote, invest')
	twn = ax[2].twinx()
	(res.liq_base/res.iloc[fvi].liq_base - 1).plot(ax=ax[2], color='red', style=['-'], label='if you cash out in base, trade')
	(res.hodl_base/res.iloc[fvi].hodl_base - 1).plot(ax=ax[2], color='red', alpha=0.5, label='if you cash out in base, invest')
	ax[2].set_ylabel('profit (in Xs)', color='blue')
	twn.set_ylabel(f'Close, {symbol}', color='green')
	res.iloc[fvi:].close.plot(ax=twn, color='green')
	twn.axhline(res.iloc[fvi].close, color='grey', lw=0.5)
	ax[2].grid('on')
	ax[2].legend()
	plt.savefig('latest_performance.png', dpi=300)
	plt.close()

def lat4_ha(update, context):
	reportPos(True)
	with open('latest_deals.png', 'rb') as f:
		context.bot.send_photo(chat_id=update.effective_chat.id, photo=f)

def lat5(update, context):
	reportPerf()
	with open('latest_performance.png', 'rb') as f:
		context.bot.send_photo(chat_id=update.effective_chat.id, photo=f)

def lat4(update, context):
	reportPos(False)
	with open('latest_deals.png', 'rb') as f:
		context.bot.send_photo(chat_id=update.effective_chat.id, photo=f)

def init():
	global thread
	global fe
	global next_time, n_minutes, interval, lookback_s, init_lookback_s
	global updater, old_time, df
	global MIN_NOTIONAL, stepSize, precision, client
	global api_key, api_secret
	global base_currency, quote_currency, base_amount, quote_amount, price_base
	global init_base, init_quote
	global fast_ema_period, slow_ema_period
	global twm
	global api_key, api_secret, telegramApiKey, pathToAPIkeys
	global user_stream_socket

	#
	if interval == '1m':
		n_minutes = 1
		init_lookback_s = 90*60 # 90 candles
		lookback_s = 2*60 # 2 candles
	elif interval == '5m':
		n_minutes = 5
		init_lookback_s = 90*60*5 # 90 candles
		lookback_s = 2*60*5 # 2 candles
	elif interval == '15m':
		n_minutes = 15
		init_lookback_s = 90*60*15 # 90 candles
		lookback_s = 2*60*15 # 2 candles
	elif interval == '1h':
		n_minutes = 60
		init_lookback_s = 90*60*60 # 90 candles
		lookback_s = 2*60*60 # 2 candles
	elif interval == '4h':
		n_minutes = 240
		init_lookback_s = 90*60*60*4 # 90 candles
		lookback_s = 2*60*60*4 # 2 candles
	else:
		logging.info('Wrong interval')
		raise OSError('Wrong interval')

	# stat the telegram bot
	updater = Updater(token=telegramApiKey, use_context=True)
	updater.dispatcher.add_handler(CommandHandler('pos_ha', lat4_ha))
	updater.dispatcher.add_handler(CommandHandler('pos', lat4))
	updater.dispatcher.add_handler(CommandHandler('perf', lat5))
	updater.dispatcher.add_handler(CommandHandler('status', getStatus))
	updater.start_polling()
	print(f'Telegram API running.')
	logging.info(f'Telegram API running.')

	# remember the time
	old_time = datetime.now().replace(second=0, microsecond=0)
	
	# get the API keys
	try:
		with open(f'{pathToAPIkeys}loadEnv.sh', 'r') as f:
			lines = f.readlines()
			for line in lines:
				for i in line.split('='):
					if i.endswith('binance_api'):
						api_key = line.split('=')[-1][:-1]
						break
					if i.endswith('binance_secret'):
						api_secret = line.split('=')[-1][:-1]
						break
	except:
		m = f'api_key file not found'
		print(m)
		logging.info(m)
		raise OSError

	# start the client (unlike bot5, here we use async)
	client = Client(api_key, api_secret)
	
	# START AN ASYNC BINANCE CLIENT AND SOCKET
	# thread = threading.Thread(target=functionInNewThread)
	# thread.start()

	# INSTEAD INITIALIZE AND START MY CUSTOM SOCKET
	user_stream_socket = MyBinanceSocket()
	user_stream_socket.start()

	for i in client.get_exchange_info()['symbols']:
		if i['symbol'] == symbol:
			base_currency = i['baseAsset']
			quote_currency = i['quoteAsset']
			precision = int(i['baseAssetPrecision'])
			for j in i['filters']:
				if j['filterType'] == 'MIN_NOTIONAL':
					MIN_NOTIONAL = float(j['minNotional'])
				if j['filterType'] == 'LOT_SIZE':
					stepSize = float(j["stepSize"])
	print('Globals initialized')
	logging.info('Globals initialized')

	# query the balance and current price
	check_balance()
	check_price()
	init_base = base_amount + quote_amount/price_base
	init_quote = quote_amount + base_amount*price_base

	# show the balance
	print(f'BALANCE:\n {base_currency}: {base_amount}\n {quote_currency}: {quote_amount}')
	logging.info(f'BALANCE:\n {base_currency}: {base_amount}\n {quote_currency}: {quote_amount}')

	# exit previously open positions (so that base and quote are equal in quote)
	if base_amount*price_base / quote_amount < 0.25:
		init_buy_sequence(0.5)
		print(f'BASE: {base_amount*price_base:.3f}, QUOTE: {quote_amount:.3f} in QUOTE')
	if base_amount*price_base / quote_amount > 4.0:
		init_sell_sequence(0.5)
		print(f'BASE: {base_amount*price_base:.3f}, QUOTE: {quote_amount:.3f} in QUOTE')

	# instantiate the feature engineer (before calling refresh_data)
	fe = FeatureEngineer(
						use_technical_indicator=True,
						tech_indicator_list = [f'close_{fast_ema_period}_ema', 
												f'close_{slow_ema_period}_ema', 'cci'],
						use_turbulence=False,
						user_defined_feature = False)

	# getting some initial data (to calculate oscillators etc.)
	m = f'Fetching an initial segment of data of 90 candles...'
	refresh_data(init_lookback_s)

	print(f'Success !')
	logging.info(f'Success !')
	logging.info(f'init_lookback_s: {init_lookback_s}')
	logging.info(f'lookback_s: {lookback_s}')


	now = datetime.now()
	next_time = now - (now - datetime.min) % timedelta(minutes=n_minutes) + timedelta(minutes=n_minutes, seconds=4)

def reportAndLog(msg):
	report(msg)
	logging.info(msg)

def try_catch(func, retry_after_s=1):
	OK_ = -1
	while OK_ == -1:
		try:
			func()
			OK_ = 0
		except:
			msg = f'Exception in {func.__name__}. Retrying in {retry_after_s} s.'
			reportAndLog(msg)
			if retry_after_s > 0:
				time.sleep(retry_after_s)
			else:
				raise

def main():
	global stopMain, user_stream_socket
	while not stopMain:
		if not user_stream_socket.is_alive():
			user_stream_socket = MyBinanceSocket()
			user_stream_socket.start()
		wait()
		try_catch(refresh_data, retry_after_s=600)
		try_catch(refresh_signal, retry_after_s=600)
		try_catch(decide_act, retry_after_s=0)
		try_catch(log_step, retry_after_s=0)
	stopMain = False
	print ('Main loop stopped')

init()

Telegram API running.
Globals initialized
BALANCE:
 BTC: 7.1e-07
 BUSD: 1392.32592026
Buy order placed
[1m[32mOrdered to BUY 0.01396 BTC @ 49854.65 BUSD.[0m
the buy order is still not filled. Waiting for 5 seconds.
the buy order is still not filled. Waiting for 5 seconds.
the buy order is still not filled. Waiting for 5 seconds.
the buy order is still not filled. Waiting for 5 seconds.
the buy order is still not filled. Waiting for 5 seconds.
[1m[36mBuy order filled. Balance: BASE: 0.01396071 QUOTE:696.35500626[0m
BASE: 696.006, QUOTE: 696.355 in QUOTE
Success !


In [18]:
# restart the User Stream socket

user_stream_socket = MyBinanceSocket()
user_stream_socket.start()

In [19]:
# user_stream_socket.stop()

In [20]:
# launch this cell if you want a kline socket

# ws1 = MyBinanceSocket(klines=True)
# ws1.start()

In [21]:
# stop the kline socket

ws1.stop()

In [33]:
# print(f'Kline socket is alive: {ws1.is_alive()}')
print(f'User stream socket is alive: {user_stream_socket.is_alive()}')
print(f'listen_key: {user_stream_socket.listen_key}')

Got a ping! Klines: False


In [6]:
# stop the main loop
thread.stop()

In [9]:
# run the main loop

thread = threading.Thread(target=main)
thread.start()

Waiting for 1h (60 minutes) until the next refresh @ 2021-09-05 02:00:04
NOW: 2021-09-05 02:00:04.098974, NEXT TIME: 2021-09-05 03:00:08
[1m[30mfast ema above slow, but price is not, waiting...[0m
[1m[35mNone: nan 49832.79 nan[0m
awaiting entry
Last date:  2021-09-05 01:00:00
Waiting for 1h (60 minutes) until the next refresh @ 2021-09-05 03:00:08
Got a ping! Klines: False


In [24]:
# inspect the dataframe

print(df.columns)
df.iloc[87:][['date', 'close', 'base_amount', 'quote_amount', 'signal', 'action', 'state', 'deal_price', 'deal_amount', 'cum_commission']]

Index(['date', 'open', 'high', 'low', 'close', 'volume', 'tic', 'close_28_ema',
       'close_84_ema', 'cci', 'lb', 'ub', 'signal', 'state', 'action',
       'deal_price', 'deal_amount', 'base_amount', 'quote_amount',
       'prof_btc_pc', 'prof_usdt_pc', 'cum_commission', 'hodl_base',
       'hodl_quote', 'liq_base', 'liq_quote'],
      dtype='object')


Unnamed: 0,date,close,base_amount,quote_amount,signal,action,state,deal_price,deal_amount,cum_commission
87,2021-09-04 23:00:00,49990.97,,,,,,,,
88,2021-09-05 00:00:00,49900.21,,,,,,,,
89,2021-09-05 01:00:00,49838.92,0.013961,696.355006,await_entry,awaiting entry,,,,7.500000e-07
90,2021-09-05 02:00:00,49980.00,0.013961,696.355006,await_entry,awaiting entry,,,,7.500000e-07
91,2021-09-05 03:00:00,49912.56,0.013961,696.355006,await_entry,awaiting entry,,,,7.500000e-07
...,...,...,...,...,...,...,...,...,...,...
336,2021-09-15 08:00:00,47119.85,0.028111,0.075670,hold,,in_long,,,6.960980e-03
337,2021-09-15 09:00:00,47165.02,0.028111,0.075670,hold,,in_long,,,6.960980e-03
338,2021-09-15 10:00:00,47070.57,0.028111,0.075670,hold,,in_long,,,6.960980e-03
339,2021-09-15 11:00:00,47196.64,0.028111,0.075670,hold,,in_long,,,6.960980e-03


In [25]:
# stop the main loop (without losing the states, data, etc)
stopMain

False

In [25]:
# get the cummulative commission paid to Binance
cum_commission

0.014699990000000001

In [27]:
# Get the last user_stream socket message
Msg

{'e': 'executionReport',
 'E': 1629820911957,
 's': 'BTCBUSD',
 'c': 'wghRKXtuxavWkl2CZTdvIQ',
 'S': 'SELL',
 'o': 'LIMIT',
 'f': 'GTC',
 'q': '0.01453100',
 'p': '48308.22000000',
 'P': '0.00000000',
 'F': '0.00000000',
 'g': -1,
 'C': '',
 'x': 'TRADE',
 'X': 'FILLED',
 'r': 'NONE',
 'i': 3175182832,
 'l': '0.00150700',
 'z': '0.01453100',
 'L': '48308.22000000',
 'n': '0.00000000',
 'N': 'BNB',
 'T': 1629820911956,
 't': 225360717,
 'I': 6548835514,
 'w': False,
 'm': True,
 'M': True,
 'O': 1629820804472,
 'Z': '701.96674482',
 'Y': '72.80048754',
 'Q': '0.00000000'}

In [32]:
# get the next candle update
str(next_time)

'2021-10-01 13:00:08'

In [35]:
# check the main loop (to stop, set stopMain = True)
stopMain

False

In [29]:
# check if the main thread is alive
thread.is_alive()

True

In [30]:
# check if the thread in which the user_stream socket is running is alive

user_stream_socket.is_alive()

True

In [31]:
stopMain

False