Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asyncio based on develop #261

Merged
merged 17 commits into from
Sep 4, 2014
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ The Counterparty protocol specification may be found at
<https://github.com/CounterpartyXCP/Counterparty>.

# Dependencies
* [Python 3](http://python.org)
* Python 3 packages: apsw, requests, appdirs, prettytable, python-dateutil, json-rpc, tornado, flask, Flask-HTTPAuth, pycoin, pyzmq(v2.2+), pycrypto (see [this link](https://github.com/CounterpartyXCP/counterpartyd/blob/master/pip-requirements.txt) for exact working versions)
* [Python 3.4](http://python.org)
* Python 3 packages: apsw, aiohttp, appdirs, prettytable, python-dateutil, json-rpc, tornado, flask, Flask-HTTPAuth, pycoin, pyzmq(v2.2+), pycrypto (see [this link](https://github.com/CounterpartyXCP/counterpartyd/blob/master/pip-requirements.txt) for exact working versions)
* Bitcoind

# Installation
Expand Down
14 changes: 9 additions & 5 deletions counterpartyd.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import binascii
from fractions import Fraction

import requests
import asyncio, aiohttp
import appdirs
from prettytable import PrettyTable

Expand Down Expand Up @@ -97,7 +97,7 @@ def market (give_asset, get_asset):

# Your Pending Orders Matches.
addresses = []
for bunch in bitcoin.get_wallet():
for bunch in util.aiorun(bitcoin.get_wallet()):
addresses.append(bunch[:2][0])
filters = [
('tx0_address', 'IN', addresses),
Expand Down Expand Up @@ -521,6 +521,7 @@ def generate_move_random_hash(move):
#patch up cmd.exe's "challenged" (i.e. broken/non-existent) UTF-8 logging
util_windows.fix_win32_unicode()

loop = asyncio.get_event_loop()
# Parse command-line arguments.
parser = argparse.ArgumentParser(prog=config.XCP_CLIENT, description='the reference implementation of the {} protocol'.format(config.XCP_NAME))
parser.add_argument('-V', '--version', action='version', version="{} v{}".format(config.XCP_CLIENT, config.VERSION_STRING))
Expand Down Expand Up @@ -1029,7 +1030,7 @@ def generate_move_random_hash(move):
totals = {}

print()
for bunch in bitcoin.get_wallet():
for bunch in util.aiorun(bitcoin.get_wallet()):
address, btc_balance = bunch[:2]
address_data = get_address(db, address=address)
balances = address_data['balances']
Expand Down Expand Up @@ -1064,7 +1065,7 @@ def generate_move_random_hash(move):

elif args.action == 'pending':
addresses = []
for bunch in bitcoin.get_wallet():
for bunch in util.aiorun(bitcoin.get_wallet()):
addresses.append(bunch[:2][0])
filters = [
('tx0_address', 'IN', addresses),
Expand Down Expand Up @@ -1113,7 +1114,10 @@ def generate_move_random_hash(move):
raise Exception("Blockchain backend (%s) not initialized! Aborting startup after %i tries." % (
config.BLOCKCHAIN_SERVICE_NAME, num_tries))

blocks.follow(db)
asyncio.async(asyncio.Task(blocks.follow(db)))
loop.run_forever() # Go do everything



else:
parser.print_help()
Expand Down
32 changes: 19 additions & 13 deletions lib/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
import json
import re
import requests
import asyncio, aiohttp
import collections
import logging
from logging import handlers as logging_handlers
Expand Down Expand Up @@ -202,32 +202,34 @@ def compose_transaction(db, name, params,
fee=None,
fee_provided=0):
tx_info = sys.modules['lib.{}'.format(name)].compose(db, **params)
return bitcoin.transaction(tx_info, encoding=encoding,
return util.aiorun(bitcoin.transaction(tx_info, encoding=encoding,
fee_per_kb=fee_per_kb,
regular_dust_size=regular_dust_size,
multisig_dust_size=multisig_dust_size,
op_return_value=op_return_value,
public_key_hex=pubkey,
allow_unconfirmed_inputs=allow_unconfirmed_inputs,
exact_fee=fee,
fee_provided=fee_provided)
fee_provided=fee_provided))

def sign_transaction(unsigned_tx_hex, private_key_wif=None):
return bitcoin.sign_tx(unsigned_tx_hex, private_key_wif=private_key_wif)
return util.aiorun(bitcoin.sign_tx(unsigned_tx_hex,
private_key_wif=private_key_wif))

def broadcast_transaction(signed_tx_hex):
if not config.TESTNET and config.BROADCAST_TX_MAINNET in ['bci', 'bci-failover']:
url = "https://blockchain.info/pushtx"
params = {'tx': signed_tx_hex}
response = requests.post(url, data=params)
if response.text.lower() != 'transaction submitted' or response.status_code != 200:
response = util.aiorun(aiohttp.request('POST', url, data=params))
data = util.aiorun(response.read())
if data.lower() != 'transaction submitted' or response.status != 200:
if config.BROADCAST_TX_MAINNET == 'bci-failover':
return bitcoin.broadcast_tx(signed_tx_hex)
return util.aiorun(bitcoin.broadcast_tx(signed_tx_hex))
else:
raise Exception(response.text)
return response.text
return data
else:
return bitcoin.broadcast_tx(signed_tx_hex)
return util.aiorun(bitcoin.broadcast_tx(signed_tx_hex))

def do_transaction(db, name, params, private_key_wif=None, **kwargs):
unsigned_tx = compose_transaction(db, name, params, **kwargs)
Expand All @@ -244,6 +246,8 @@ def __init__(self):
def run(self):
global current_api_status_code, current_api_status_response_json
db = util.connect_to_db(flags='SQLITE_OPEN_READONLY')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

while True:
try:
Expand All @@ -256,9 +260,9 @@ def run(self):
# Check that the database has caught up with bitcoind.
if time.time() - self.last_database_check > 10 * 60: # Ten minutes since last check.
code = 11
bitcoin.bitcoind_check(db)
util.aiorun(bitcoin.bitcoind_check(db))
code = 12
util.database_check(db, bitcoin.get_block_count()) # TODO: If not reparse or rollback, once those use API.
util.database_check(db, util.aiorun(bitcoin.get_block_count())) # TODO: If not reparse or rollback, once those use API.
self.last_database_check = time.time()
except Exception as e:
exception_name = e.__class__.__name__
Expand All @@ -279,6 +283,8 @@ def run(self):
db = util.connect_to_db(flags='SQLITE_OPEN_READONLY')
app = flask.Flask(__name__)
auth = HTTPBasicAuth()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

@auth.get_password
def get_pw(username):
Expand Down Expand Up @@ -393,7 +399,7 @@ def get_asset_info(assets):
# BTC and XCP.
if asset in [config.BTC, config.XCP]:
if asset == config.BTC:
supply = bitcoin.get_btc_supply(normalize=False)
supply = util.aiorun(bitcoin.get_btc_supply(normalize=False))
else:
supply = util.xcp_supply(db)

Expand Down Expand Up @@ -480,7 +486,7 @@ def get_blocks(block_indexes):

@dispatcher.add_method
def get_running_info():
latestBlockIndex = bitcoin.get_block_count()
latestBlockIndex = util.aiorun(bitcoin.get_block_count())

try:
util.database_check(db, latestBlockIndex)
Expand Down
87 changes: 45 additions & 42 deletions lib/bitcoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import decimal
import logging

import requests
import asyncio, aiohttp
from pycoin.ecdsa import generator_secp256k1, public_pair_for_secret_exponent
from pycoin.encoding import wif_to_tuple_of_secret_exponent_compressed, public_pair_to_sec, is_sec_compressed, EncodingError
from Crypto.Cipher import ARC4
Expand All @@ -41,84 +41,85 @@ def print_coin(coin):
return 'amount: {}; txid: {}; vout: {}; confirmations: {}'.format(coin['amount'], coin['txid'], coin['vout'], coin.get('confirmations', '?')) # simplify and make deterministic

def get_block_count():
return int(rpc('getblockcount', []))
count= yield from rpc('getblockcount', [])
return int(count)

def get_block_hash(block_index):
return rpc('getblockhash', [block_index])
return(yield from rpc('getblockhash', [block_index]))

def is_valid (address):
return rpc('validateaddress', [address])['isvalid']
return((yield from rpc('validateaddress', [address]))['isvalid'])

def is_mine (address):
return rpc('validateaddress', [address])['ismine']
return(yield from rpc('validateaddress', [address])['ismine'])

def send_raw_transaction (tx_hex):
return rpc('sendrawtransaction', [tx_hex])
return(yield from rpc('sendrawtransaction', [tx_hex]))

def get_raw_transaction (tx_hash, json=True):
if json:
return rpc('getrawtransaction', [tx_hash, 1])
return(yield from rpc('getrawtransaction', [tx_hash, 1]))
else:
return rpc('getrawtransaction', [tx_hash])
return(yield from rpc('getrawtransaction', [tx_hash]))

def get_block (block_hash):
return rpc('getblock', [block_hash])
return(yield from rpc('getblock', [block_hash]))

def get_block_hash (block_index):
return rpc('getblockhash', [block_index])
return(yield from rpc('getblockhash', [block_index]))

def decode_raw_transaction (unsigned_tx_hex):
return rpc('decoderawtransaction', [unsigned_tx_hex])
return(yield from rpc('decoderawtransaction', [unsigned_tx_hex]))

def get_wallet ():
for group in rpc('listaddressgroupings', []):
addressgroupings = yield from rpc('listaddressgroupings', [])
for group in addressgroupings:
for bunch in group:
yield bunch

def get_mempool ():
return rpc('getrawmempool', [])
return(yield from rpc('getrawmempool', []))

def get_info():
return rpc('getinfo', [])
return(yield from rpc('getinfo', []))

def bitcoind_check (db):
"""Checks blocktime of last block to see if {} Core is running behind.""".format(config.BTC_NAME)
block_count = rpc('getblockcount', [])
block_hash = rpc('getblockhash', [block_count])
block = rpc('getblock', [block_hash])
block_count = yield from rpc('getblockcount', [])
block_hash = yield from rpc('getblockhash', [block_count])
block = yield from rpc('getblock', [block_hash])
time_behind = time.time() - block['time'] # How reliable is the block time?!
if time_behind > 60 * 60 * 2: # Two hours.
raise exceptions.BitcoindError('Bitcoind is running about {} seconds behind.'.format(round(time_behind)))

def connect (host, payload, headers):
global bitcoin_rpc_session
if not bitcoin_rpc_session: bitcoin_rpc_session = requests.Session()
def connect (url, payload, headers):
TRIES = 12
for i in range(TRIES):
try:
response = bitcoin_rpc_session.post(host, data=json.dumps(payload), headers=headers, verify=config.BACKEND_RPC_SSL_VERIFY)
response = yield from asyncio.Task(aiohttp.request('POST', url, data=json.dumps(payload),
headers=headers))
if i > 0: print('Successfully connected.', file=sys.stderr)
return response
except requests.exceptions.SSLError as e:
raise e
except requests.exceptions.ConnectionError:
logging.debug('Could not connect to Bitcoind. (Try {}/{})'.format(i+1, TRIES))
except aiohttp.ConnectionError:
print('Could not connect to Bitcoind. Sleeping for five seconds. (Try {}/{})'.format(i+1, TRIES), file=sys.stderr)
time.sleep(5)
return None

def wallet_unlock ():
getinfo = get_info()
getinfo = yield from get_info()
if 'unlocked_until' in getinfo:
if getinfo['unlocked_until'] >= 60:
return True # Wallet is unlocked for at least the next 60 seconds.
else:
passphrase = getpass.getpass('Enter your Bitcoind[‐Qt] wallet passhrase: ')
print('Unlocking wallet for 60 (more) seconds.')
rpc('walletpassphrase', [passphrase, 60])
yield from rpc('walletpassphrase', [passphrase, 60])
else:
return True # Wallet is unencrypted.

@asyncio.coroutine
def rpc (method, params):
starttime = time.time()
headers = {'content-type': 'application/json'}
payload = {
"method": method,
Expand All @@ -135,13 +136,13 @@ def rpc (method, params):
f.write(payload)
'''

response = connect(config.BACKEND_RPC, payload, headers)
response = yield from connect(config.BACKEND_RPC, payload, headers)
if response == None:
if config.TESTNET: network = 'testnet'
else: network = 'mainnet'
raise exceptions.BitcoindRPCError('Cannot communicate with {} Core. ({} is set to run on {}, is {} Core?)'.format(config.BTC_NAME, config.XCP_CLIENT, network, config.BTC_NAME))
elif response.status_code not in (200, 500):
raise exceptions.BitcoindRPCError(str(response.status_code) + ' ' + response.reason)
elif response.status not in (200, 500):
raise exceptions.BitcoindRPCError(str(response.status) + ' ' + response.reason)

'''
if config.UNITTEST:
Expand All @@ -150,15 +151,15 @@ def rpc (method, params):
'''

# Return result, with error handling.
response_json = response.json()
response_json = yield from response.json()
if 'error' not in response_json.keys() or response_json['error'] == None:
return response_json['result']
elif response_json['error']['code'] == -5: # RPC_INVALID_ADDRESS_OR_KEY
raise exceptions.BitcoindError('{} Is txindex enabled in {} Core?'.format(response_json['error'], config.BTC_NAME))
elif response_json['error']['code'] == -4: # Unknown private key (locked wallet?)
# If address in wallet, attempt to unlock.
address = params[0]
validate_address = rpc('validateaddress', [address])
validate_address = yield from rpc('validateaddress', [address])
if validate_address['isvalid']:
if validate_address['ismine']:
raise exceptions.BitcoindError('Wallet is locked.')
Expand All @@ -168,7 +169,7 @@ def rpc (method, params):
raise exceptions.AddressError('Invalid address.')
elif response_json['error']['code'] == -1 and response_json['message'] == 'Block number out of range.':
time.sleep(10)
return rpc('getblockhash', [block_index])
return(yield from rpc('getblockhash', [block_index]))

# elif config.UNITTEST:
# print(method)
Expand Down Expand Up @@ -384,9 +385,10 @@ def sort_unspent_txouts(unspent, allow_unconfirmed_inputs):
return unspent

def private_key_to_public_key (private_key_wif):
# allowable_wif_prefixes = [
allowable_wif_prefixes = [ b'\x80', b'\xef' ]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to fix this---these values should be set in config.py.

try:
secret_exponent, compressed = wif_to_tuple_of_secret_exponent_compressed(private_key_wif, is_test=config.TESTNET)
secret_exponent, compressed = wif_to_tuple_of_secret_exponent_compressed(
private_key_wif, allowable_wif_prefixes=allowable_wif_prefixes)
except EncodingError:
raise exceptions.AltcoinSupportError('pycoin: unsupported WIF prefix')
public_pair = public_pair_for_secret_exponent(generator_secp256k1, secret_exponent)
Expand Down Expand Up @@ -431,7 +433,7 @@ def transaction (tx_info, encoding='auto', fee_per_kb=config.DEFAULT_FEE_PER_KB,
if config.UNITTEST:
private_key_wif = config.UNITTEST_PRIVKEY[source]
else:
private_key_wif = rpc('dumpprivkey', [source])
private_key_wif = yield from rpc('dumpprivkey', [source])

# Derive public key.
public_key_hex = private_key_to_public_key(private_key_wif)
Expand Down Expand Up @@ -459,7 +461,8 @@ def transaction (tx_info, encoding='auto', fee_per_kb=config.DEFAULT_FEE_PER_KB,

# Check that the source is in wallet.
if not config.UNITTEST and encoding in ('multisig') and not public_key:
if not rpc('validateaddress', [source])['ismine']:
ismine = yield from rpc('validateaddress', [source])
if not ismine['ismine']:
raise exceptions.AddressError('Not one of your Bitcoin addresses:', source)

# Check that the destination output isn't a dust output.
Expand Down Expand Up @@ -508,7 +511,7 @@ def chunks(l, n):
outputs_size = ((25 + 9) * len(destination_outputs)) + (len(data_array) * data_output_size)

# Get inputs.
unspent = get_unspent_txouts(source, normalize=True)
unspent = yield from get_unspent_txouts(source, normalize=True)
unspent = sort_unspent_txouts(unspent, allow_unconfirmed_inputs)
logging.debug('Sorted UTXOs: {}'.format([print_coin(coin) for coin in unspent]))

Expand Down Expand Up @@ -572,7 +575,7 @@ def sign_tx (unsigned_tx_hex, private_key_wif=None):
raise exceptions.TransactionError('Could not sign transaction with pybtctool.')

else: # Assume source is in wallet and wallet is unlocked.
result = rpc('signrawtransaction', [unsigned_tx_hex])
result = yield from rpc('signrawtransaction', [unsigned_tx_hex])
if result['complete']:
signed_tx_hex = result['hex']
else:
Expand Down Expand Up @@ -613,9 +616,9 @@ def get_unspent_txouts(address, normalize=False):
with open(CURR_DIR + '/../test/listunspent.test.json', 'r') as listunspent_test_file: # HACK
wallet_unspent = json.load(listunspent_test_file)
return [output for output in wallet_unspent if output['address'] == address]

if rpc('validateaddress', [address])['ismine']:
wallet_unspent = rpc('listunspent', [0, 999999])
rpcv = yield from rpc('validateaddress', [address])
if rpcv['ismine']:
wallet_unspent = yield from rpc('listunspent', [0, 999999])
return [output for output in wallet_unspent if output['address'] == address]
else:
return blockchain.listunspent(address)
Expand Down