Skip to content

Commit

Permalink
Run txs in parallel, fine tune output
Browse files Browse the repository at this point in the history
  • Loading branch information
miohtama committed Feb 28, 2018
1 parent 5ea8011 commit 26e4ae7
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 17 deletions.
67 changes: 54 additions & 13 deletions ico/amlreclaim.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@
import csv
import logging
from collections import namedtuple
from typing import List, Optional
from typing import List, Optional, Tuple

from ico.utils import validate_ethereum_address, check_succesful_tx
from web3.contract import Contract

from ico.utils import validate_ethereum_address, check_succesful_tx
from ico.utils import check_multiple_succesful_txs


logger = logging.getLogger(__name__)

#: A parsed CSV input entry
Entry = namedtuple("Entry", ("address", "label"))


def reclaim_address(token: Contract, entry: Entry, tx_params: dict) -> int:
def reclaim_address(token: Contract, entry: Entry, tx_params: dict) -> Tuple[int, str]:
"""Reclsaim tokens for a single participant.
:param token: Token contract we reclaim
Expand All @@ -34,32 +36,52 @@ def reclaim_address(token: Contract, entry: Entry, tx_params: dict) -> int:

if token.call().balanceOf(entry.address) == 0:
logger.info("%s: looks like already reclaimed %s", entry.address, entry.label)
return 0
return 0, None

txid = token.transact(tx_params).transferToOwner(entry.address)
logger.info("%s: reclaiming %s in txid %s", entry.address, entry.label, txid)

check_succesful_tx(token.web3, txid)

return 1
return 1, txid


def reclaim_all(token: Contract, reclaim_list: List[Entry], tx_params: dict) -> int:
"""Reclaim all tokens from the given input sheet.
Process transactions parallel to speed up the operation.
:param tx_parms: Ethereum transaction parameters to use
"""

total_reclaimed = 0

tx_to_confirm = [] # List of txids to confirm
tx_batch_size = 16 # How many transactions confirm once
web3 = token.web3

for entry in reclaim_list:
total_reclaimed += reclaim_address(token, entry, tx_params)
ops, txid = reclaim_address(token, entry, tx_params)
total_reclaimed += ops

if not txid:
# Already reclaimed
continue

tx_to_confirm.append(txid)

# Confirm N transactions when batch max size is reached
if len(tx_to_confirm) >= tx_batch_size:
check_multiple_succesful_txs(web3, tx_to_confirm)
tx_to_confirm = []

# Confirm dangling transactions
check_multiple_succesful_txs(web3, tx_to_confirm)

return total_reclaimed


def prepare_csv(stream, address_key, label_key) -> List[Entry]:
"""Processa CSV reclaim file.
"""Process CSV reclaim file.
Make sure all Ethereum addresses are valid. Filter out duplicates.
:param token: Token contract
:param owner: ETH account set as the owner of the token
Expand All @@ -72,20 +94,36 @@ def prepare_csv(stream, address_key, label_key) -> List[Entry]:
reader = csv.DictReader(stream)
rows = [row for row in reader]
output_rows = []
uniq = set()

# Prevalidate addresses
# Here we do it inline and make skip addresses that are not valid.
for idx, row in enumerate(rows):
addr = row[address_key].strip()
label = row[label_key].strip()

if not addr:
# Empty cell / row
continue

if not addr.startswith("0x"):
addr = "0x" + addr

try:
if addr:
validate_ethereum_address(addr)
except ValueError as e:
logger.error("Invalid Ethereum address on row:", idx + 1, "address:", addr, "reason:", str(e), "external_id:",
row[label_key])
logger.error("Invalid Ethereum address on row:%d address:%s label:%s reason:%s", idx+1, addr, label, str(e))
continue

addr = addr.lower()

if addr in uniq:
logger.warn("Address has duplicates: %s", addr)
continue

uniq.add(addr)

output_row = Entry(address=addr, label=label)
output_rows.append(output_row)

Expand All @@ -97,7 +135,10 @@ def count_tokens_to_reclaim(token, rows: List[Entry]):

total = 0

for entry in rows:
for idx, entry in enumerate(rows):
total += token.call().balanceOf(entry.address)

if idx % 20 == 0:
logger.info("Prechecking balances %d / %d", idx, len(rows))

return total
7 changes: 5 additions & 2 deletions ico/cmd/amlreclaim.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def main(chain, owner_address, token, csv_file, address_column, label_column, ga
--address-column="address" \
--label-column="label" \
--csv-file=test.csv
"""

setup_console_logging()
Expand Down Expand Up @@ -63,7 +64,7 @@ def main(chain, owner_address, token, csv_file, address_column, label_column, ga

decimals = token.call().decimals()
logger.info("Total supply is %s", token.call().totalSupply() / (10**decimals))
logger.info("Owner account token balance is", token.call().balanceOf(owner_address))
logger.info("Owner account token balance is %s", token.call().balanceOf(owner_address) / (10**decimals))

if gas_price:
gas_price = int(gas_price) * 10**9
Expand All @@ -76,12 +77,14 @@ def main(chain, owner_address, token, csv_file, address_column, label_column, ga
"gasPrice": gas_price,
}

logger.info("Using gas price of %f", gas_price / 10**9, "GWei")
logger.info("Using gas price of %f GWei", gas_price / 10**9)

logger.info("Reading data from %s", csv_file)
with open(csv_file, "rt") as inp:
rows = prepare_csv(inp, address_column, label_column)

logger.info("Total %s rows", len(rows))

amount = count_tokens_to_reclaim(token, rows) / 10**decimals
logger.info("Claiming total %f tokens", amount)

Expand Down
9 changes: 7 additions & 2 deletions ico/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def setup_console_logging(log_level=None):
Don't use logging settings from INI, but use hardcoded defaults.
"""

formatter = logging.Formatter("[%(asctime)s] [%(name)s %(funcName)s] %(message)s") # same as default
formatter = logging.Formatter("[%(asctime)s] [%(name)s] %(message)s") # same as default

# setup `RainbowLoggingHandler`
# and quiet some logs for the test output
Expand All @@ -25,7 +25,12 @@ def setup_console_logging(log_level=None):
log_level = log_level or getattr(logging, env_level.upper())
logger.setLevel(log_level)

# Limit requests noisiness
# Limit dependency package noisiness
logger = logging.getLogger("requests.packages.urllib3.connectionpool")
logger.setLevel(logging.ERROR)

logger = logging.getLogger("anyconfig")
logger.setLevel(logging.ERROR)

logger = logging.getLogger("populus.compilation")
logger.setLevel(logging.ERROR)

0 comments on commit 26e4ae7

Please sign in to comment.