Permalink
Fetching contributors…
Cannot retrieve contributors at this time
executable file 844 lines (776 sloc) 38.4 KB
#! /usr/bin/env python
"""
The MIT License (MIT)
Copyright (c) 2015 creon (creon.nu@gmail.com)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
OR OTHER DEALINGS IN THE SOFTWARE.
"""
import SimpleHTTPServer
import SocketServer
import BaseHTTPServer
import cgi
import os
from math import ceil
from thread import start_new_thread
from logging.handlers import TimedRotatingFileHandler
from exchanges import *
from utils import *
import config
_wrappers = {'bittrex': Bittrex, 'poloniex': Poloniex, 'ccedk': CCEDK, 'bitcoincoid': BitcoinCoId, 'bter': BTER,
'testing': Peatio, 'cryptsy': Cryptsy}
for e in config._interest:
_wrappers[e] = _wrappers[e]()
for u in config._interest[e]:
for s in ['bid', 'ask']:
config._interest[e][u][s]['orders'] = []
config._interest[e][u][s]['low'] = config._interest[e][u][s]['rate']
config._interest[e][u][s]['high'] = config._interest[e][u][s]['rate']
try:
os.makedirs('logs')
except:
pass
try:
os.makedirs('stats')
except:
pass
dummylogger = logging.getLogger('null')
dummylogger.addHandler(logging.NullHandler())
dummylogger.propagate = False
logname = str(int(time.time() * 100))
creditor = logging.getLogger("credits")
creditor.propagate = False
creditformat = logging.Formatter(fmt='%(asctime)s: %(message)s', datefmt="%Y/%m/%d-%H:%M:%S")
ch = TimedRotatingFileHandler('logs/%s.credits' % logname, when='midnight')
ch.setFormatter(creditformat)
creditor.addHandler(ch)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
fh = TimedRotatingFileHandler('logs/%s.log' % logname, when='midnight')
fh.setLevel(logging.DEBUG)
sh = logging.StreamHandler()
sh.setLevel(logging.INFO)
formatter = logging.Formatter(fmt='%(asctime)s %(levelname)s: %(message)s', datefmt="%Y/%m/%d-%H:%M:%S")
fh.setFormatter(formatter)
sh.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(sh)
_liquidity = []
_active_users = 0
_round = 0
_valflag = False
master = Connection(config._master, logger) if config._master != "" else None
slaves = [CheckpointThread(host, logger) for host in config._slaves]
keys = {}
pricefeed = PriceFeed(15, logger)
lock = threading.Lock()
class NuRPC():
def __init__(self, config, address, logger=None):
"""
Check that the RPC library can be imported and set up the RPC http server connection
"""
self.logger = logger if logger else logging.getLogger('null')
self.address = address
self.rpc = None
try:
import jsonrpc
except ImportError:
self.logger.warning('NuRPC: jsonrpc library could not be imported')
else:
# rpc connection
self.JSONRPCException = jsonrpc.JSONRPCException
opts = dict(tuple(line.strip().replace(' ', '').split('='))
for line in open(config).readlines() if len(line.split('=')) == 2)
if 'rpcuser' not in opts.keys() or 'rpcpassword' not in opts.keys():
self.logger.error("NuRPC: RPC parameters could not be read")
else:
try:
self.rpc = jsonrpc.ServiceProxy("http://%s:%s@127.0.0.1:%s" %
(opts['rpcuser'], opts['rpcpassword'], 14002))
self.txfee = self.rpc.getinfo()['paytxfee']
except:
self.logger.error("NuRPC: RPC connection could not be established")
self.rpc = None
def pay(self, txout):
"""
Attempt to make the payment to the addresses specified in txout
:param txout: a json formatted 'sendtomany' object
:return:
"""
self.logger.info("attempting to make payout")
try:
self.rpc.sendmany("", txout)
self.logger.info("successfully sent payout: %s", txout)
return True
except AttributeError:
self.logger.error('NuRPC: client not initialized')
except self.JSONRPCException as e:
self.logger.error('NuRPC: unable to send payout: %s', e.error['message'])
except:
self.logger.error("NuRPC: unable to send payout (exception caught): %s", sys.exc_info()[1])
return False
def liquidity(self, bid, ask, identifier):
"""
Submit liquidity info through the rpc connection
:param bid:
:param ask:
:return:
tier:pair:exchange:botsessionid
Example of a valid identifier : 2:BTCNBT:ccedk:nubotsession3
"""
try:
self.rpc.liquidityinfo('B', bid, ask, self.address, identifier)
self.logger.info("successfully sent liquidity: buy: {0} sell: {1} "
"identifier: {2}".format(bid, ask, identifier))
return True
except AttributeError:
self.logger.error('NuRPC: liquidity client not initialized')
except self.JSONRPCException as e:
self.logger.error('NuRPC: unable to send liquidity: %s', e.error['message'])
except:
self.logger.error("NuRPC: unable to send liquidity (exception caught): %s", sys.exc_info()[1])
return False
class User(threading.Thread):
def __init__(self, key, address, unit, exchange, pricefeed, sampling, tolerance, logger=None):
threading.Thread.__init__(self)
self.key = key
self.active = True
self.address = address
self.balance = 0.0
self.pricefeed = pricefeed
self.unit = unit
self.exchange = exchange
self.tolerance = tolerance
self.sampling = sampling
self.last_errors = [""] * sampling
self.cost = {'ask': config._interest[repr(exchange)][unit]['bid']['rate'],
'bid': config._interest[repr(exchange)][unit]['ask']['rate']}
self.rate = {'ask': config._interest[repr(exchange)][unit]['bid']['rate'],
'bid': config._interest[repr(exchange)][unit]['ask']['rate']}
self.liquidity = {'ask': [[] for i in xrange(sampling)], 'bid': [[] for i in xrange(sampling)]}
self.credits = {
'ask': [[{'amount': 0.0, 'cost': 0.0}, {'amount': 0.0, 'cost': 0.0}, {'amount': 0.0, 'cost': 0.0}]
for i in xrange(sampling)],
'bid': [[{'amount': 0.0, 'cost': 0.0}, {'amount': 0.0, 'cost': 0.0}, {'amount': 0.0, 'cost': 0.0}]
for i in xrange(sampling)]}
self.lock = threading.Lock()
self.trigger = threading.Lock()
self.trigger.acquire()
self.response = ['m' for i in xrange(sampling)]
self.logger = logger if logger else logging.getLogger('null')
self.requests = []
self.daemon = True
self.cancel = False
self.history = []
self.page = 1
self.record()
self.bundle()
def record(self):
missings = self.response.count('m')
rejects = self.response.count('r')
amount = {'bid': [], 'ask': []}
norm = max(1.0, float(len(self.response) - missings - rejects))
for i in xrange(len(self.response)):
for side in ['bid', 'ask']:
stats = config._interest[repr(self.exchange)][self.unit][side]
if not amount[side] and self.response[i] == 'a':
amount[side] = [self.credits[side][i][j]['amount'] for j in xrange(3)]
if self.credits[side][i][0]['cost'] == stats['high'] \
or self.credits[side][i][1]['cost'] == stats['low']:
amount[side] = [self.credits[side][i][j]['amount'] for j in xrange(3)]
self.history.append(
{'time': int(time.time()), 'balance': self.balance, 'missings': missings, 'rejects': rejects,
'bid': amount['bid'], 'ask': amount['ask'], 'rate': self.rate})
if len(self.history) == 60 * 24 + 1:
with open('stats/%s.%s.%s.%d' % (logname, self.key, self.unit, self.page), 'w') as fo:
fo.write(json.dumps(self.history))
self.page += 1
self.history = self.history[-1:]
def bundle(self):
self.checkpoint = {'liquidity': self.liquidity.copy(), 'response': self.response[:],
'last_errors': self.last_errors[:], 'balance': self.balance}
def set(self, request, bid, ask, sign):
if len(self.requests) < 10: # don't accept more requests to avoid simple spamming
self.requests.append(({p: v[0] for p, v in request.items()}, sign, {'bid': bid, 'ask': ask}))
self.active = True
def run(self):
while True:
self.trigger.acquire()
self.lock.acquire()
res = 'm'
if self.requests:
requests = self.requests[:]
self.requests = []
for rid, request in enumerate(requests):
try:
orders = self.exchange.validate_request(self.key, self.unit, request[0], request[1])
except:
orders = {'error': 'exception caught: %s' % sys.exc_info()[1]}
if not 'error' in orders and not self.cancel:
valid = {'bid': [], 'ask': []}
price = self.pricefeed.price(self.unit,0.0)
last_error = ''
for order in orders:
deviation = 1.0 - min(order['price'], price) / max(order['price'], price)
if deviation <= self.tolerance:
span = 60.0 / config._sampling
et = int(time.time())
st = et - span
if 'closed' in order and order['closed'] < et:
et = order['closed']
if 'opened' in order and order['opened'] > st:
st = order['opened']
order['amount'] *= max(0.0, float(et - st) / span)
valid[order['type']].append([order['id'], order['amount'], request[2][order['type']]])
else:
self.last_errors.append(
'unable to validate request: order of deviates too much from current price')
for side in ['bid', 'ask']:
del self.liquidity[side][0]
self.liquidity[side].append(valid[side])
if last_error != "" and len(valid['bid'] + valid['ask']) == 0:
res = 'r'
self.last_errors.append(last_error)
self.logger.debug(
"unable to validate request %d/%d for user %s at exchange %s on unit %s: orders of deviate too much from current price",
rid + 1, len(self.requests), self.key, repr(self.exchange), self.unit)
else:
res = 'a'
self.last_errors.append("")
break
else:
if self.cancel:
orders = {'error': 'request handling canceled'}
res = 'r'
if rid + 1 == len(requests) or self.cancel:
self.last_errors.append("unable to validate request: " + orders['error'])
self.logger.warning(
"unable to validate request %d/%d for user %s at exchange %s on unit %s: %s",
rid + 1, len(requests), self.key, repr(self.exchange), self.unit, orders['error'])
for side in ['bid', 'ask']:
del self.liquidity[side][0]
self.liquidity[side].append([])
if self.cancel: break
else:
self.last_errors.append("no request received")
for side in ['bid', 'ask']:
del self.liquidity[side][0]
self.liquidity[side].append([])
self.response = self.response[1:] + [res]
del self.last_errors[0]
self.lock.release()
def validate(self):
self.cancel = False
try:
self.trigger.release()
except thread.error:
pass # user did not finish last request in time
def finish(self):
if self.active:
self.cancel = True
try:
self.lock.acquire()
self.lock.release()
except KeyboardInterrupt:
raise
def response(errcode=0, message='success'):
return {'code': errcode, 'message': message}
def register(params):
ret = response()
if set(params.keys()) == set(['address', 'key', 'name']):
user = params['key'][0]
name = params['name'][0]
address = params['address'][0]
if address[0] == 'B': # this is certainly not a proper check
if name in config._interest:
for slave in slaves:
slave.register(address, user, name)
if not user in keys:
lock.acquire()
keys[user] = {}
for unit in config._interest[name]:
keys[user][unit] = User(user, address, unit, _wrappers[name], pricefeed, config._sampling,
config._tolerance, logger)
keys[user][unit].start()
lock.release()
logger.info("new user %s on %s: %s" % (user, name, address))
else:
if keys[user].values()[0].address != address:
ret = response(9, "user already exists with different address: %s" % user)
else:
ret = response(8, "unknown exchange requested: %s" % name)
else:
ret = response(7, "invalid payout address: %s" % address)
else:
ret = response(6, "invalid registration data received: %s" % str(params))
return ret
def liquidity(params):
ret = response()
if set(params.keys() + ['user', 'sign', 'unit', 'ask', 'bid']) == set(params.keys()):
user = params.pop('user')[0]
sign = params.pop('sign')[0]
unit = params.pop('unit')[0]
try:
bid = float(params.pop('bid')[0])
ask = float(params.pop('ask')[0])
if user in keys:
if unit in keys[user]:
start_new_thread(keys[user][unit].set, (params, bid, ask, sign))
else:
ret = response(12, "unit for user %s not found: %s" % (user, unit))
else:
ret = response(11, "user not found: %s" % user)
except ValueError:
ret = response(10, "invalid cost information received: %s" % str(params))
return ret
def poolstats():
return {'liquidity': ([(0, 0)] + _liquidity)[-1], 'sampling': config._sampling, 'users': _active_users,
'credits': _round / config._sampling, 'validations': _round}
critical_message = ""
def userstats(user):
res = {'balance': 0.0, 'efficiency': 0.0, 'rejects': 0, 'missing': 0, 'message': critical_message}
res['units'] = {}
for unit in keys[user]:
checkpoint = keys[user][unit].checkpoint
if checkpoint['liquidity']['bid'].count([]) + checkpoint['liquidity']['ask'].count([]) < 2 * config._sampling:
credits = {
'bid': [{'amount': 0.0, 'cost': -1.0}, {'amount': 0.0, 'cost': -1.0}, {'amount': 0.0, 'cost': -1.0}],
'ask': [{'amount': 0.0, 'cost': -1.0}, {'amount': 0.0, 'cost': -1.0}, {'amount': 0.0, 'cost': -1.0}]}
last_error = ""
missing = checkpoint['response'].count('m')
rejects = checkpoint['response'].count('r')
res['balance'] += checkpoint['balance']
res['missing'] += missing
res['rejects'] += rejects
norm = max(1.0, float(keys[user][unit].sampling - missing - rejects))
for i in xrange(keys[user][unit].sampling):
if checkpoint['last_errors'][i] != "":
last_error = checkpoint['last_errors'][i]
for side in ['bid', 'ask']:
stats = config._interest[repr(keys[user][unit].exchange)][unit][side]
if credits[side][0]['cost'] < 0.0 and checkpoint['response'][i] == 'a':
for j in xrange(3):
credits[side][j]['amount'] = keys[user][unit].credits[side][i][j]['amount']
credits[side][j]['cost'] = 0.0
if keys[user][unit].credits[side][i][0]['cost'] == stats['high'] or \
keys[user][unit].credits[side][i][1]['cost'] == stats['low']:
for j in xrange(3):
credits[side][j]['amount'] = keys[user][unit].credits[side][i][j]['amount']
for side in ['bid', 'ask']:
stats = config._interest[repr(keys[user][unit].exchange)][unit][side]
credits[side][0]['cost'] = stats['high']
credits[side][1]['cost'] = stats['low']
res['units'][unit] = {'bid': credits['bid'],
'ask': credits['ask'],
'rate': keys[user][unit].rate,
'rejects': rejects,
'missing': missing,
'active': keys[user][unit].active,
'last_error': last_error}
if len(res['units']) > 0:
res['efficiency'] = 1.0 - (res['rejects'] + res['missing']) / float(config._sampling * len(res['units']))
return res
def collect(timeout):
for slave in slaves:
slave.collect(timeout)
for slave in slaves:
checkpoint = slave.finish()
if not 'error' in checkpoint:
for user in checkpoint:
for unit in checkpoint[user]:
for i in xrange(config._sampling):
if keys[user][unit].response[i] == 'm':
keys[user][unit].last_errors[i] = checkpoint[user][unit]['last_errors'][i]
if checkpoint[user][unit]['response'][i] != 'm':
keys[user][unit].response[i] = checkpoint[user][unit]['response'][i]
if checkpoint[user][unit]['response'][i] == 'a':
for side in ['bid', 'ask']:
keys[user][unit].liquidity[side][i] = checkpoint[user][unit]['liquidity'][side][
i]
for user in keys:
for unit in keys[user]:
keys[user][unit].bundle()
keys[user][unit].active = keys[user][unit].liquidity['bid'].count([]) + keys[user][unit].liquidity[
'ask'].count([]) < 2 * keys[user][unit].sampling
def checkpoints(params):
ret = {}
for user in params:
if user in keys:
for unit in keys[user]:
if keys[user][unit].active:
if not user in ret: ret[user] = {}
ret[user][unit] = keys[user][unit].checkpoint
return ret
def credit():
for name in config._interest:
for unit in config._interest[name]:
users = [k for k in keys if unit in keys[k] and repr(keys[k][unit].exchange) == name]
for user in users:
keys[user][unit].record()
keys[user][unit].rate['bid'] = 0.0
keys[user][unit].rate['ask'] = 0.0
for side in ['bid', 'ask']:
config._interest[name][unit][side]['low'] = 0.0
config._interest[name][unit][side]['high'] = 0.0
config._interest[name][unit][side]['orders'] = []
for sample in xrange(config._sampling):
config._interest[name][unit][side]['orders'].append([])
# payout variables
# maxrate = config._interest[name][unit][side]['rate']
submitted = []
for user in users:
keys[user][unit].credits[side][sample] = [{'amount': 0.0, 'cost': 0.0},
{'amount': 0.0, 'cost': 0.0},
{'amount': 0.0, 'cost': 0.0}]
submitted.extend([(user, order) for order in keys[user][unit].liquidity[side][sample]])
submitted.sort(key=lambda x: (x[1][2], x[1][0]))
orders = [submitted[i] for i in xrange(len(submitted)) if
i == 0 or submitted[i][1][0] != submitted[i - 1][1][0]]
mass = sum([order[1] for _, order in submitted])
imprate = float(config._interest[name][unit][side]['rate'])
if mass > 0:
maxrate = imprate * 10000000 / mass
target = min(mass, config._interest[name][unit][side]['target'])
maxlevel = int(ceil(mass / target))
pricelevels = sorted(
list(set([order[2] for _, order in orders if order[2] < maxrate])) + [maxrate, maxrate])
if len(pricelevels) < maxlevel + 2:
pricelevels += [maxrate] * (2 + maxlevel - len(pricelevels))
# calculate level
levelvolume = [0.0 for p in pricelevels]
for user, order in orders:
if order[2] <= maxrate:
for i, p in enumerate(pricelevels):
if order[2] < p or p == maxrate:
levelvolume[i] += order[1]
lower = mass - int(mass / target) * target
higher = int((mass / target) + 1) * target - mass
lvl = len(pricelevels) - 3
for i in xrange(1, len(levelvolume) - 1):
if levelvolume[i - 1] >= lower and levelvolume[i] >= config._interest[name][unit][side][
'target']:
lvl = i - 2
break
config._interest[name][unit][side]['low'] = pricelevels[lvl + 1]
config._interest[name][unit][side]['high'] = pricelevels[lvl + 2]
# collect user contribution
volume = [{user: 0.0 for user in users}, {user: 0.0 for user in users},
{user: 0.0 for user in users}]
for user, order in orders:
volume[2][user] += order[1]
if order[2] <= maxrate:
ulvl = pricelevels.index(order[2])
if ulvl < lvl + 1:
volume[0][user] += order[1]
if ulvl < lvl + 2 or pricelevels[lvl + 2] == maxrate:
volume[1][user] += order[1]
if sample == config._sampling - 1:
logger.debug('%s pricelevel %d [%.4f,%.4f]: %s', side, lvl, float(sum(volume[0].values())),
float(sum(volume[1].values())), " ".join([str(s) for s in pricelevels]))
logger.debug('%s pricevolumes [%.4f,%.4f]: %s', side, lower, higher,
" ".join([str(s) for s in levelvolume]))
# credit higher payout level
norm = float(sum(volume[1].values()))
for user in volume[1]:
if norm > 0 and volume[1][user] > 0:
price = pricelevels[lvl + 2]
contrib = min(volume[1][user], higher * volume[1][user] / norm)
payout = contrib * price
volume[0][user] -= contrib
volume[2][user] -= contrib
keys[user][unit].balance += payout / float(24 * 60 * config._sampling)
keys[user][unit].credits[side][sample][0] = {'amount': contrib, 'cost': price}
keys[user][unit].rate[side] += price * contrib / (volume[1][user] * config._sampling)
config._interest[name][unit][side]['orders'][sample].append(
{'amount': contrib, 'cost': price})
creditor.info("[%d/%d] %.8f %s %.8f %s %s %s %.2f high %s",
sample + 1, config._sampling, payout / float(24 * 60 * config._sampling),
user, contrib, side, name, unit, price * 100, keys[user][unit].address)
# credit lower payout level
norm = float(sum([max(0, v) for v in volume[0].values()]))
for user in volume[0]:
if norm > 0 and volume[0][user] > 0:
price = pricelevels[lvl + 1]
contrib = min(volume[0][user], lower * volume[0][user] / norm)
payout = contrib * price
volume[2][user] -= contrib
keys[user][unit].balance += payout / float(24 * 60 * config._sampling)
keys[user][unit].credits[side][sample][1] = {'amount': contrib, 'cost': price}
keys[user][unit].rate[side] += price * contrib / (volume[0][user] * config._sampling)
config._interest[name][unit][side]['orders'][sample].append(
{'amount': contrib, 'cost': price})
creditor.info("[%d/%d] %.8f %s %.8f %s %s %s %.2f low %s",
sample + 1, config._sampling, payout / float(24 * 60 * config._sampling),
user, contrib, side, name, unit, price * 100, keys[user][unit].address)
# mark zero payout level
for user in volume[2]:
if volume[2][user] > float(10 ** (-8)):
keys[user][unit].credits[side][sample][2] = {'amount': volume[2][user], 'cost': 0.0}
config._interest[name][unit][side]['orders'][sample].append(
{'amount': volume[2][user], 'cost': 0.0})
def pay(nud):
txout = {}
lock.acquire()
for user in keys:
for unit in keys[user]:
if not keys[user][unit].address in txout:
txout[keys[user][unit].address] = 0.0
txout[keys[user][unit].address] += keys[user][unit].balance
lock.release()
txfee = 0.01 if not nud.rpc else nud.txfee
txout = {k: v - nud.txfee for k, v in txout.items() if v - txfee > config._minpayout}
if txout:
sent = False
if config._autopayout:
sent = nud.pay(txout)
try:
filename = 'logs/%d.credit' % time.time()
out = open(filename, 'w')
out.write(json.dumps(txout))
out.close()
if not sent:
logger.info("successfully stored payout to %s: %s", filename, txout)
lock.acquire()
for user in keys:
for unit in keys[user]:
if keys[user][unit].address in txout and keys[user][unit].balance > 0.0:
creditor.info("[-] %.8f %s %s", keys[user][unit].balance, user, unit)
keys[user][unit].balance = 0.0
lock.release()
except:
logger.error("failed to store payout to %s: %s", filename, txout)
else:
logger.warning("not processing payouts because no valid balances were detected.")
def submit(nud):
curliquidity = {}
lock.acquire()
for user in keys:
for unit in keys[user]:
exchange = repr(keys[user][unit].exchange)
if exchange not in curliquidity:
curliquidity[exchange] = {}
if unit not in curliquidity[exchange]:
curliquidity[exchange][unit] = [0, 0]
for s in xrange(config._sampling):
curliquidity[exchange][unit][0] += sum([order[1] for order in
keys[user][unit].liquidity[
'bid'][-(s + 1)]])
curliquidity[exchange][unit][1] += sum([order[1] for order in
keys[user][unit].liquidity[
'ask'][-(s + 1)]])
lock.release()
for exchange in curliquidity:
for unit in curliquidity[exchange]:
liquidity = [curliquidity[exchange][unit][0] / float(config._sampling),
curliquidity[exchange][unit][1] / float(config._sampling)]
_liquidity.append(liquidity)
identifier = "1:{0}:{1}:{2}".format('NBT'+unit.upper(),
exchange,
config._pool_name)
nud.liquidity(liquidity[0], liquidity[1], identifier)
def sync():
ts = int(time.time() * 1000.0)
return {'time': ts, 'sync': 15000, 'round': _round}
class RequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def do_POST(self):
if len(self.path) == 0:
self.send_response(404)
return
self.path = self.path[1:]
if self.path in ['register', 'liquidity', 'checkpoints']:
ctype, pdict = cgi.parse_header(self.headers.getheader('content-type'))
if ctype == 'application/x-www-form-urlencoded':
length = int(self.headers.getheader('content-length'))
params = cgi.parse_qs(self.rfile.read(length), keep_blank_values=1)
if self.path == 'register':
ret = register(params)
elif self.path == 'liquidity':
ret = liquidity(params)
elif self.path == 'checkpoints':
if _valflag:
ret = {'error': "validation in progress"}
else:
ret = checkpoints(params)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.wfile.write("\n")
self.wfile.write(json.dumps(ret))
self.end_headers()
def do_GET(self):
if len(self.path.replace('/', '')) == 0:
self.send_response(200)
return
method = self.path[1:]
if master:
try:
content = json.dumps(master.get(method, trials=1, timeout=5))
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.wfile.write("\n")
self.wfile.write(content)
self.end_headers()
except:
self.send_response(404)
elif 'loaderio' in method: # evil hack to support load tester (TODO)
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.wfile.write("\n")
self.wfile.write(method.replace('/', ''))
self.end_headers()
elif method in ['status', 'exchanges', 'sync']:
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.wfile.write("\n")
if method == 'status':
self.wfile.write(json.dumps(poolstats()))
elif method == 'exchanges':
self.wfile.write(json.dumps(config._interest))
elif method == 'sync':
self.wfile.write(json.dumps(sync()))
self.end_headers()
elif method in keys:
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.wfile.write("\n")
self.wfile.write(json.dumps(userstats(method)))
self.end_headers()
elif '/' in method:
root = method.split('/')[0]
method = method.split('/')[1:]
if root == 'price':
price = {'price': pricefeed.price(method[0],0.0)}
if price['price']:
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.wfile.write("\n")
self.wfile.write(json.dumps(price))
self.end_headers()
else:
self.send_response(404)
elif root == 'info':
if len(method) == 2 and method[0] in config._interest and method[1] in config._interest[method[0]]:
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.wfile.write("\n")
self.wfile.write(json.dumps(config._interest[method[0]][method[1]]))
self.end_headers()
else:
self.send_response(404)
elif root == 'history':
try:
page = int(method[2])
if page == 0:
content = json.dumps(keys[method[0]][method[1]].history)
else:
content = open('stats/%s.%s.%s.%d' % (logname, method[0], method[1], page)).read()
content = fin.read()
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.wfile.write("\n")
self.wfile.write(content)
self.end_headers()
except:
self.send_response(404)
else:
self.send_response(404)
else:
self.send_response(404)
def log_message(self, format, *args):
pass
class ThreadingServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
pass
nud = NuRPC(config._nuconfig, config._grantaddress, logger)
if not nud.rpc:
logger.critical('Connection to Nu daemon could not be established, liquidity will NOT be sent!')
config._autopayout = False
httpd = ThreadingServer(("", config._port), RequestHandler)
sa = httpd.socket.getsockname()
logger.debug("serving on %s port %d", sa[0], sa[1])
start_new_thread(httpd.serve_forever, ())
if master:
_round = -1
ts = int(time.time() * 1000.0)
ret = master.get('sync', trials=3, timeout=15)
if not 'error' in ret:
_round = ret['round']
delay = (60000 - (ret['time'] % 60000)) - (int(time.time() * 1000.0) - ts) / 2
if delay <= 0:
logger.error("unable to synchronize time with master server: time difference to small")
logger.info("waiting %.2f seconds to synchronize with master server", delay / 1000.0)
time.sleep(delay / 1000.0)
else:
logger.error("unable to synchronize time with master server: %s", ret['message'])
elif slaves:
delay = max(float(60000 - (int(time.time() * 1000) % 60000)), 0.0)
if delay > 0.0:
logger.info("waiting %.2f seconds to synchronize with slave servers", delay / 1000.0)
time.sleep(delay / 1000.0)
lastcheckp = time.time()
lastcredit = time.time()
lastpayout = time.time()
lastsubmit = time.time()
while True:
try:
curtime = time.time()
# wait for validation round to end:
lock.acquire()
_active_users = 0
for user in keys:
active = False
for unit in keys[user]:
keys[user][unit].finish()
active = active or keys[user][unit].active
if active: _active_users += 1
lock.release()
# create checkpoints
if not slaves or curtime - lastcheckp >= 60:
collect(max(float(60 / config._sampling) - time.time() + curtime, 0.01) / 2.0)
lastcheckp = curtime
_valflag = False
if not master:
_round += 1
# send liquidity
if curtime - lastsubmit >= 5:
submit(nud)
lastsubmit = curtime
# credit requests
if curtime - lastcredit >= 60:
credit()
lastcredit = curtime
# make payout
if curtime - lastpayout >= 86400:
pay(nud)
lastpayout = curtime
else:
while True:
ret = master.get('sync', trials=1, timeout=1)
if 'error' in ret or ret['round'] == _round:
time.sleep(0.1)
continue
_round = ret['round']
break
# start new validation round
_valflag = True
lock.acquire()
for user in keys:
for unit in keys[user]:
keys[user][unit].validate()
lock.release()
if not master:
time.sleep(max(float(60 / config._sampling) - time.time() + curtime, 0))
except Exception as e:
logger.error('exception caught in main loop: %s', sys.exc_info()[1])
httpd.socket.close()
raise