Skip to content

Commit

Permalink
Merge pull request #4 from ethereum/subprocess_mining
Browse files Browse the repository at this point in the history
Subprocess mining
  • Loading branch information
heikoheiko committed May 9, 2015
2 parents 302ac50 + 498e281 commit 17a560a
Show file tree
Hide file tree
Showing 12 changed files with 462 additions and 117 deletions.
76 changes: 25 additions & 51 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,69 +1,43 @@
## Builds pyethapp from github in a python 2.7 docker container.
#
## Builds pyethapp from GitHub in a python 2.7.9 docker container.
## Note: base image, do not use in a production environment
##
## Build via:
## Build with:
#
# `docker build --rm --no-cache -t pyethapp:latest
# docker build -t pyethapp .
#
## the '--rm --no-cache' above avoids stacking up and or reusing intermediary builds.
##
## Run via:
## Run with:
#
# `docker run --rm \
# -p 0.0.0.0:30303:30303 \
# -p 0.0.0.0:30303:30303/udp \
# -v <host dir>:/data \
# pyethapp:latest`
# docker run -p 30303:30303 -p 30303:30303/udp pyethapp
#
## Note, that <host dir> given in '-v <host dir>:/data' above needs
## to be writable for the user inside the container (uid:1000/gid:1000).

FROM python:2.7.9

RUN apt-get update && \
apt-get install -y git-core && \
apt-get clean
RUN apt-get update
RUN apt-get install -y git-core

RUN git clone https://github.com/ethereum/pyrlp && cd pyrlp &&\
git checkout develop && \
pip install -r requirements.txt && \
python setup.py install
RUN git clone https://github.com/ethereum/pyrlp
WORKDIR pyrlp
RUN git checkout develop
RUN pip install -e .

RUN git clone https://github.com/ethereum/pydevp2p && cd pydevp2p &&\
pip install -r requirements.txt && \
python setup.py install
RUN git clone https://github.com/ethereum/pydevp2p
WORKDIR pydevp2p
RUN pip install -e .

RUN git clone https://github.com/ethereum/pyethereum && cd pyethereum &&\
git checkout develop && \
pip install -r requirements.txt && \
python setup.py install
RUN git clone https://github.com/ethereum/pyethereum
WORKDIR pyethereum
RUN git checkout develop
RUN pip install -e .

RUN git clone https://github.com/ethereum/pyethapp && cd pyethapp && \
python setup.py install
RUN git clone https://github.com/ethereum/pyethapp
WORKDIR pyethapp
RUN pip install -e .

# Fix debian's ridicolous gevent-breaking constant removal
# Fix debian's ridiculous gevent-breaking constant removal
# (e.g. https://github.com/hypothesis/h/issues/1704#issuecomment-63893295):
RUN sed -i 's/PROTOCOL_SSLv3/PROTOCOL_SSLv23/g' /usr/local/lib/python2.7/site-packages/gevent/ssl.py

# For docker builds from git, we add the last commit to the version string:
RUN cd pyethapp &&\
sed -i "s/client_version = 'pyethapp/client_version = 'pyethapp_$(git rev-parse HEAD| cut -c 1-6)/g" /usr/local/lib/python2.7/site-packages/pyethapp*/pyethapp/app.py

RUN adduser pyethuser

# We use a startscript to wrap the '-d /data' parameter.
# in theory this wouldn't be necessary and we could use the following two lines:
#ENTRYPOINT ["/usr/local/bin/pyethapp", "-d", "/data/pyethdata"]
#CMD ["run"]
# unfortunately, in practice it doesn't though.
RUN echo '#!/bin/bash\nset -e\n\nexec /usr/local/bin/pyethapp -d /data/pyethdata "$@"' > /home/pyethuser/startscript.sh

RUN chmod +x /home/pyethuser/startscript.sh &&\
chown -R pyethuser. /home/pyethuser

USER pyethuser

ENTRYPOINT ["/home/pyethuser/startscript.sh"]
CMD ["run"]

# mount host directory via "-v /path/to/host/dir:/data" - needs u+w or g+w for 1000
VOLUME ["/data"]
ENTRYPOINT ["pyethapp"]
20 changes: 19 additions & 1 deletion pyethapp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# ############# version ##################
from pkg_resources import get_distribution, DistributionNotFound
import os.path
import subprocess
try:
_dist = get_distribution('pyethapp')
# Normalize case for Windows systems
Expand All @@ -11,7 +12,24 @@
# not installed, but there is another version that *is*
raise DistributionNotFound
except DistributionNotFound:
__version__ = 'undef'
__version__ = None
else:
__version__ = _dist.version
if not __version__:
try:
# try to parse from setup.py
for l in open(os.path.join(__path__[0], '..', 'setup.py')):
if l.startswith("version = '"):
__version__ = l.split("'")[1]
break
finally:
if not __version__:
__version__ = 'undefined'
# add git revision and commit status
try:
rev = subprocess.check_output(['git', 'rev-parse', 'HEAD'])
is_dirty = len(subprocess.check_output(['git', 'diff', '--shortstat']).strip())
__version__ += '-' + rev[:4] + '-dirty' if is_dirty else ''
except:
pass
# ########### endversion ##################
1 change: 1 addition & 0 deletions pyethapp/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def pubkey(self):
def address(self):
a = privtoaddr(self.privkey)
assert len(a) == 20
assert isinstance(a, bytes)
return a

def __repr__(self):
Expand Down
24 changes: 20 additions & 4 deletions pyethapp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import config as konfig
from db_service import DBService
from jsonrpc import JSONRPCServer
from pow_service import PoWService
from accounts import AccountsService
from pyethapp import __version__
import utils
Expand All @@ -26,9 +27,8 @@
log = slogging.get_logger('app')


services = [DBService, AccountsService, NodeDiscovery, PeerManager, ChainService,
services = [DBService, AccountsService, NodeDiscovery, PeerManager, ChainService, PoWService,
JSONRPCServer, Console]
services += utils.load_contrib_services()


class EthApp(BaseApp):
Expand All @@ -46,12 +46,18 @@ class EthApp(BaseApp):
help='data directory')
@click.option('log_config', '--log_config', '-l', multiple=False, type=str,
help='log_config string: e.g. ":info,eth:debug')
@click.option('--log-json/--log-no-json', default=False,
help='log as structured json output')
@click.option('bootstrap_node', '--bootstrap_node', '-b', multiple=False, type=str,
help='single bootstrap_node as enode://pubkey@host:port')
@click.option('mining_pct', '--mining_pct', '-m', multiple=False, type=int, default=0,
help='pct cpu used for mining')
@click.pass_context
def app(ctx, alt_config, config_values, data_dir, log_config):
def app(ctx, alt_config, config_values, data_dir, log_config, bootstrap_node, log_json, mining_pct):

# configure logging
log_config = log_config or ':info'
slogging.configure(log_config)
slogging.configure(log_config, log_json=log_json)

# data dir default or from cli option
data_dir = data_dir or konfig.default_data_dir
Expand Down Expand Up @@ -79,6 +85,13 @@ def app(ctx, alt_config, config_values, data_dir, log_config):
raise BadParameter('Config parameter must be of the form "a.b.c=d" where "a.b.c" '
'specifies the parameter to set and d is a valid yaml value '
'(example: "-c jsonrpc.port=5000")')
if bootstrap_node:
config['discovery']['bootstrap_nodes'] = [bytes(bootstrap_node)]

if mining_pct > 0:
config['pow']['activated'] = True
config['pow']['cpu_pct'] = int(min(100, mining_pct))

ctx.obj = {'config': config}


Expand All @@ -99,6 +112,9 @@ def run(ctx, dev):
log.warn("can't get and add login name to client_version")
pass

# dump config
konfig.dump_config(ctx.obj['config'])

# register services
for service in services:
assert issubclass(service, BaseService)
Expand Down
7 changes: 5 additions & 2 deletions pyethapp/eth_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ def create(self, proto, chain_difficulty, chain_head_hash, genesis_hash):
network_id = proto.service.app.config['eth'].get('network_id', proto.network_id)
return [proto.version, network_id, chain_difficulty, chain_head_hash, genesis_hash]

class gettransactions(BaseProtocol.command):
class newblockhashes(BaseProtocol.command):

"unused"
"""
NewBlockHashes [+0x01: P, hash1: B_32, hash2: B_32, ...] Specify one or more new blocks which have appeared on the network. Including hashes that the sending peer could reasonable be considered to know that the receiving node is aware of is considered Bad Form, and may reduce the reputation of the sending node. Including hashes that the sending node later refuses to honour with a proceeding GetBlocks message is considered Bad Form, and may reduce the reputation of the sending node.
"""
cmd_id = 1
structure = rlp.sedes.CountableList(rlp.sedes.binary)

class transactions(BaseProtocol.command):

Expand Down
66 changes: 54 additions & 12 deletions pyethapp/eth_service.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# https://github.com/ethereum/go-ethereum/wiki/Blockpool
import time
from ethereum.utils import privtoaddr, sha3
from ethereum.utils import sha3
import rlp
from rlp.utils import encode_hex
from ethereum import processblock
from synchronizer import Synchronizer
from ethereum.slogging import get_logger
from ethereum.chain import Chain
from ethereum.blocks import Block
from ethereum.blocks import Block, VerificationFailed
from ethereum.transactions import Transaction
from devp2p.service import WiredService
import eth_protocol
Expand Down Expand Up @@ -69,6 +69,8 @@ class ChainService(WiredService):
config = None
block_queue_size = 1024
transaction_queue_size = 1024
processed_gas = 0
processed_elapsed = 0

def __init__(self, app):
self.config = app.config
Expand All @@ -86,9 +88,21 @@ def __init__(self, app):
self.add_blocks_lock = False
self.add_transaction_lock = gevent.lock.Semaphore()
self.broadcast_filter = DuplicatesFilter()
self.on_new_head_cbs = []
self.on_new_head_candidate_cbs = []

@property
def is_syncing(self):
return self.synchronizer.synctask is not None

def _on_new_head(self, block):
pass
for cb in self.on_new_head_cbs:
cb(block)
self._on_new_head_candidate() # we implicitly have a new head_candidate

def _on_new_head_candidate(self):
for cb in self.on_new_head_candidate_cbs:
cb(self.chain.head_candidate)

def add_transaction(self, tx, origin=None):
assert isinstance(tx, Transaction)
Expand All @@ -97,15 +111,24 @@ def add_transaction(self, tx, origin=None):
success = self.chain.add_transaction(tx)
self.add_transaction_lock.release()
if success:
self._on_new_head_candidate()
self.broadcast_transaction(tx, origin=origin) # asap

def add_block(self, t_block, proto):
"adds a block to the block_queue and spawns _add_block if not running"
self.block_queue.put((t_block, proto)) # blocks if full
if not self.add_blocks_lock:
self.add_blocks_lock = True
self.add_blocks_lock = True # need to lock here (ctx switch is later)
gevent.spawn(self._add_blocks)

def add_mined_block(self, block):
log.debug('adding mined block', block=block)
assert block.check_pow()
if self.chain.add_block(block):
log.info('added', block=block, ts=time.time())
assert block == self.chain.head
self.broadcast_newblock(block, chain_difficulty=block.chain_difficulty())

def knows_block(self, block_hash):
"if block is in chain or in queue"
if block_hash in self.chain:
Expand All @@ -117,7 +140,10 @@ def knows_block(self, block_hash):
return False

def _add_blocks(self):
log.debug('add_blocks', qsize=self.block_queue.qsize())
log.debug('add_blocks', qsize=self.block_queue.qsize(),
add_tx_lock=self.add_transaction_lock.locked())
assert self.add_blocks_lock is True
self.add_transaction_lock.acquire()
try:
while not self.block_queue.empty():
t_block, proto = self.block_queue.peek() # peek: knows_block while processing
Expand All @@ -129,6 +155,7 @@ def _add_blocks(self):
log.warn('missing parent', block=t_block)
self.block_queue.get()
continue
# FIXME, this is also done in validation and in synchronizer for new_blocks
if not t_block.header.check_pow():
log.warn('invalid pow', block=t_block, FIXME='ban node')
self.block_queue.get()
Expand All @@ -137,20 +164,29 @@ def _add_blocks(self):
st = time.time()
block = t_block.to_block(db=self.chain.db)
elapsed = time.time() - st
log.debug('deserialized', elapsed='%.2fs' % elapsed,
gas_used=block.gas_used, gpsec=int(block.gas_used / elapsed))
log.debug('deserialized', elapsed='%.4fs' % elapsed,
gas_used=block.gas_used, gpsec=self.gpsec(block.gas_used, elapsed))
except processblock.InvalidTransaction as e:
log.warn('invalid transaction', block=t_block, error=e, FIXME='ban node')
gevent.sleep(0.001)
self.block_queue.get()
continue
except VerificationFailed as e:
log.warn('verification failed', error=e, FIXME='ban node')
self.block_queue.get()
continue

if self.chain.add_block(block):
log.info('added', block=block)
self.block_queue.get()
log.info('added', block=block, ts=time.time())
self.block_queue.get() # remove block from queue (we peeked only)
gevent.sleep(0.001)

finally:
self.add_blocks_lock = False
self.add_transaction_lock.release()

def gpsec(self, gas_spent=0, elapsed=0):
self.processed_gas += gas_spent
self.processed_elapsed += elapsed
return int(self.processed_gas / (0.001 + self.processed_elapsed))

def broadcast_newblock(self, block, chain_difficulty=None, origin=None):
if not chain_difficulty:
Expand Down Expand Up @@ -243,7 +279,13 @@ def on_receive_getblockhashes(self, proto, child_block_hash, count):

last = child_block_hash
while len(found) < max_hashes:
last = rlp.decode_lazy(self.chain.db.get(last))[0][0]
try:
last = rlp.decode_lazy(self.chain.db.get(last))[0][0]
except KeyError:
# this can happen if we started a chain download, which did not complete
# should not happen if the hash is part of the canonical chain
log.warn('KeyError in getblockhashes', hash=last)
break
if last:
found.append(last)
else:
Expand Down

0 comments on commit 17a560a

Please sign in to comment.