Skip to content

Commit

Permalink
Set timeout for escrow transaction check
Browse files Browse the repository at this point in the history
Closes #14

Signed-off-by: alfred richardsn <rchrdsn@protonmail.ch>
  • Loading branch information
r4rdsn committed Mar 15, 2020
1 parent 95c36c1 commit 88c00c1
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 37 deletions.
3 changes: 2 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ ORDERS_LIMIT_COUNT=10
ORDER_DURATION_LIMIT=30

# Escrow
ESCROW_FEE_PERCENTS=5
ESCROW_ENABLED=true
ESCROW_FEE_PERCENTS=5
CHECK_TIMEOUT_HOURS=24
WIF_FILENAME=/run/secrets/wif.json
6 changes: 5 additions & 1 deletion locale/en/LC_MESSAGES/bot.po
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ msgid ""
msgstr ""
"Project-Id-Version: TellerBot\n"
"Report-Msgid-Bugs-To: rchrdsn@protonmail.ch\n"
"POT-Creation-Date: 2020-03-15 18:25+0300\n"
"POT-Creation-Date: 2020-03-15 19:18+0300\n"
"PO-Revision-Date: 2019-10-03 14:00+0300\n"
"Last-Translator: alfred richardsn <rchrdsn@protonmail.ch>\n"
"Language: en\n"
Expand All @@ -18,6 +18,10 @@ msgstr ""
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.8.0\n"

#: src/escrow/blockchain/__init__.py
msgid "check_timeout {hours}"
msgstr "Transaction check took longer than {hours} hours, so escrow was cancelled."

#: src/escrow/blockchain/__init__.py
msgid "transaction_passed {currency}"
msgstr "Transaction has passed. I'll notify should you get {currency}."
Expand Down
6 changes: 5 additions & 1 deletion locale/ru/LC_MESSAGES/bot.po
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ msgid ""
msgstr ""
"Project-Id-Version: TellerBot\n"
"Report-Msgid-Bugs-To: rchrdsn@protonmail.ch\n"
"POT-Creation-Date: 2020-03-15 18:25+0300\n"
"POT-Creation-Date: 2020-03-15 19:18+0300\n"
"PO-Revision-Date: 2019-02-24 04:46+0300\n"
"Last-Translator: alfred richardsn <rchrdsn@protonmail.ch>\n"
"Language: ru\n"
Expand All @@ -19,6 +19,10 @@ msgstr ""
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.8.0\n"

#: src/escrow/blockchain/__init__.py
msgid "check_timeout {hours}"
msgstr "Проверка транзакции длилась дольше {hours} часов, поэтому эскроу отменено."

#: src/escrow/blockchain/__init__.py
msgid "transaction_passed {currency}"
msgstr "Транзакция прошла. Я оповещу когда вы должны будете получить {currency}."
Expand Down
65 changes: 49 additions & 16 deletions src/escrow/blockchain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
import typing
from abc import ABC
from abc import abstractmethod
from asyncio import create_task # type: ignore
from asyncio import create_task
from asyncio import get_running_loop
from decimal import Decimal
from time import time

Expand All @@ -28,6 +29,7 @@
from bson.objectid import ObjectId

from src.bot import tg
from src.config import config
from src.database import database
from src.i18n import i18n

Expand Down Expand Up @@ -105,6 +107,24 @@ def trx_url(self, trx_id: str) -> str:
"""Get URL on transaction with ID ``trx_id`` on explorer."""
return self.explorer.format(trx_id)

def create_queue_member(
self, **kwargs
) -> typing.Optional[typing.Mapping[str, typing.Any]]:
"""Create queue member from keyword arguments if it is not timeouted.
Schedule timeout handler and add it to queue member, return None if
queue member is timeouted.
"""
loop = get_running_loop()
timedelta = kwargs["transaction_time"] - time()
delay = timedelta + config.CHECK_TIMEOUT_HOURS
callback = self.check_timeout(kwargs["offer_id"])
if delay <= 0:
create_task(callback)
return None
kwargs["timeout_handler"] = loop.call_later(delay, create_task, callback)
return kwargs

async def check_transaction(
self,
offer_id: ObjectId,
Expand All @@ -116,32 +136,45 @@ async def check_transaction(
transaction_time: float,
):
"""Add transaction in ``self._queue`` to be checked."""
self._queue.append(
{
"offer_id": offer_id,
"from_address": from_address,
"amount_with_fee": amount_with_fee,
"amount_without_fee": amount_without_fee,
"asset": asset,
"memo": memo,
"transaction_time": transaction_time,
}
queue_member = self.create_queue_member(
offer_id=offer_id,
from_address=from_address,
amount_with_fee=amount_with_fee,
amount_without_fee=amount_without_fee,
asset=asset,
memo=memo,
transaction_time=transaction_time,
)
if not queue_member:
return
self._queue.append(queue_member)
# Start streaming if not already streaming
if len(self._queue) == 1:
await self.start_streaming()

def remove_from_queue(self, offer_id: ObjectId) -> bool:
"""Remove transaction with specified ``offer_id`` value from ``self._queue``.
async def check_timeout(self, offer_id: ObjectId) -> None:
"""Timeout transaction check.
:param offer_id: ``_id`` of escrow offer.
:return: True if transaction was found and False otherwise.
"""
for queue_member in self._queue:
if queue_member["offer_id"] == offer_id:
self._queue.remove(queue_member)
return True
return False
break
offer = await database.escrow.find_one_and_delete({"_id": offer_id})
await database.escrow_archive.insert_one(offer)
await tg.send_message(
offer["init"]["id"],
i18n("check_timeout {hours}", locale=offer["init"]["locale"]).format(
hours=config.CHECK_TIMEOUT_HOURS
),
)
await tg.send_message(
offer["counter"]["id"],
i18n("check_timeout {hours}", locale=offer["counter"]["locale"]).format(
hours=config.CHECK_TIMEOUT_HOURS
),
)

async def _confirmation_callback(
self,
Expand Down
37 changes: 19 additions & 18 deletions src/escrow/blockchain/golos_blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import functools
import json
import typing
from asyncio import create_task # type: ignore
from asyncio import get_running_loop # type: ignore
from asyncio import sleep # type: ignore
from asyncio import create_task
from asyncio import get_running_loop
from asyncio import sleep
from calendar import timegm
from datetime import datetime
from decimal import Decimal
Expand Down Expand Up @@ -74,19 +74,19 @@ async def connect(self):
else:
address = offer["counter"]["send_address"]
amount = offer["sum_sell"].to_decimal()
queue.append(
{
"offer_id": offer["_id"],
"from_address": address,
"amount_with_fee": offer["sum_fee_up"].to_decimal(),
"amount_without_fee": amount,
"asset": offer[offer["type"]],
"memo": offer["memo"],
"transaction_time": offer["transaction_time"],
}
queue_member = self.create_queue_member(
offer_id=offer["_id"],
from_address=address,
amount_with_fee=offer["sum_fee_up"].to_decimal(),
amount_without_fee=amount,
asset=offer[offer["type"]],
memo=offer["memo"],
transaction_time=offer["transaction_time"],
)
if min_time is None or offer["transaction_time"] < min_time:
min_time = offer["transaction_time"]
if queue_member is not None:
queue.append()
if min_time is None or offer["transaction_time"] < min_time:
min_time = offer["transaction_time"]
if not queue:
return
func = functools.partial(
Expand Down Expand Up @@ -179,9 +179,9 @@ async def _start_streaming(self):
)
if is_confirmed:
self._queue.remove(req)
if not self._queue:
await loop.run_in_executor(None, self._stream.rpc.close)
return
if not self._queue:
await loop.run_in_executor(None, self._stream.rpc.close)
return
response = await loop.run_in_executor(None, self._stream.rpc.ws.recv)
response_json = json.loads(response)
if "error" in response_json:
Expand All @@ -205,6 +205,7 @@ async def _check_operation(
continue
if op["to"] != self.address or op["from"] != req["from_address"]:
continue
req["timeout_handler"].cancel()
refund_reasons = set()
if asset != req["asset"]:
refund_reasons.add("asset")
Expand Down

0 comments on commit 88c00c1

Please sign in to comment.