Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for debit notes #219

Merged
merged 12 commits into from Jan 25, 2021
8 changes: 7 additions & 1 deletion examples/low-level-api/list-offers.py
Expand Up @@ -39,7 +39,13 @@ def main():
enable_default_logger()
try:
asyncio.get_event_loop().run_until_complete(
asyncio.wait_for(list_offers(Configuration(), subnet_tag=subnet,), timeout=4,)
asyncio.wait_for(
list_offers(
Configuration(),
subnet_tag=subnet,
),
timeout=4,
)
)
except TimeoutError:
pass
Expand Down
5 changes: 4 additions & 1 deletion handbook_gen/handbook_gen.py
Expand Up @@ -27,7 +27,10 @@ def get_md_files(md_root):
root = md_root
return sorted(
[
(dirname, sorted([f for f in files if f.endswith(".md")]),)
(
dirname,
sorted([f for f in files if f.endswith(".md")]),
)
for dirname, _, files in os.walk(root)
]
)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Expand Up @@ -42,7 +42,7 @@ cli = ['fire', 'rich']


[tool.poetry.dev-dependencies]
black = "^19.10b0"
black = "^20.8b1"
pytest = "^5.4.3"
portray = "^1"
pytest-asyncio = "^0.12.0"
Expand Down
10 changes: 8 additions & 2 deletions tests/executor/test_payment_platforms.py
Expand Up @@ -128,7 +128,10 @@ async def mock_create_allocation(_self, model):

with pytest.raises(_StopExecutor):
async with Executor(
package=mock.Mock(), budget=10.0, driver="matching-driver", network="matching-network",
package=mock.Mock(),
budget=10.0,
driver="matching-driver",
network="matching-network",
) as executor:
async for _ in executor.submit(worker=mock.Mock(), data=mock.Mock()):
pass
Expand All @@ -146,7 +149,10 @@ async def test_driver_network_case_insensitive(monkeypatch, _mock_create_allocat

with pytest.raises(_StopExecutor):
async with Executor(
package=mock.Mock(), budget=10.0, driver="dRiVeR", network="NeTwOrK",
package=mock.Mock(),
budget=10.0,
driver="dRiVeR",
network="NeTwOrK",
) as executor:
async for _ in executor.submit(worker=mock.Mock(), data=mock.Mock()):
pass
Expand Down
73 changes: 62 additions & 11 deletions yapapi/executor/__init__.py
Expand Up @@ -48,12 +48,16 @@
else:
from async_exit_stack import AsyncExitStack # type: ignore

DEBIT_NOTE_MIN_TIMEOUT: Final[int] = 30 # in seconds
"Shortest debit note acceptance timeout the requestor will accept."

DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP: Final[str] = "golem.com.payment.debit-notes.acceptance-timeout"

CFG_INVOICE_TIMEOUT: Final[timedelta] = timedelta(minutes=5)
"Time to receive invoice from provider after tasks ended."

DEFAULT_NETWORK = "rinkeby"
DEFAULT_DRIVER = "zksync"
DEFAULT_NETWORK = "rinkeby"

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -205,7 +209,7 @@ async def _submit(

# Building offer
builder = DemandBuilder()
builder.add(Activity(expiration=self._expires, multi_activity=True,))
builder.add(Activity(expiration=self._expires, multi_activity=True))
builder.add(NodeInfo(subnet_tag=self._subnet))
if self._subnet:
builder.ensure(f"({NodeInfoKeys.subnet_tag}={self._subnet})")
Expand Down Expand Up @@ -254,7 +258,7 @@ async def process_invoices() -> None:
amount=invoice.amount,
)
)
allocation = self._allocation_for_invoice(invoice)
allocation = self._get_allocation(invoice)
try:
await invoice.accept(amount=invoice.amount, allocation=allocation)
except CancelledError:
Expand All @@ -279,6 +283,33 @@ async def process_invoices() -> None:
if payment_closing and not agreements_to_pay:
break

# TODO Consider processing invoices and debit notes together
async def process_debit_notes() -> None:
async for debit_note in self._payment_api.incoming_debit_notes():
if debit_note.agreement_id in agreements_to_pay:
emit(
events.DebitNoteReceived(
agr_id=debit_note.agreement_id,
amount=debit_note.total_amount_due,
note_id=debit_note.debit_note_id,
)
)
allocation = self._get_allocation(debit_note)
try:
await debit_note.accept(
amount=debit_note.total_amount_due, allocation=allocation
)
except CancelledError:
raise
except Exception:
emit(
events.PaymentFailed(
agr_id=debit_note.agreement_id, exc_info=sys.exc_info() # type: ignore
)
)
if payment_closing and not agreements_to_pay:
break

async def accept_payment_for_agreement(agreement_id: str, *, partial: bool = False) -> None:
emit(events.PaymentPrepared(agr_id=agreement_id))
inv = invoices.get(agreement_id)
Expand All @@ -287,7 +318,7 @@ async def accept_payment_for_agreement(agreement_id: str, *, partial: bool = Fal
emit(events.PaymentQueued(agr_id=agreement_id))
return
del invoices[agreement_id]
allocation = self._allocation_for_invoice(inv)
allocation = self._get_allocation(inv)
await inv.accept(amount=inv.amount, allocation=allocation)
emit(
events.PaymentAccepted(
Expand Down Expand Up @@ -344,6 +375,19 @@ async def find_offers() -> None:
prop_id=proposal.id, reason="No common payment platforms"
)
)
timeout = proposal.props.get(DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP)
if timeout:
if timeout < DEBIT_NOTE_MIN_TIMEOUT:
with contextlib.suppress(Exception):
await proposal.reject()
emit(
events.ProposalRejected(
prop_id=proposal.id,
reason="Debit note acceptance timeout too short",
)
)
else:
builder.properties[DEBIT_NOTE_ACCEPTANCE_TIMEOUT_PROP] = timeout
await proposal.respond(builder.properties, builder.constraints)
emit(events.ProposalResponded(prop_id=proposal.id))
except CancelledError:
Expand Down Expand Up @@ -476,12 +520,19 @@ async def worker_starter() -> None:
process_invoices_job = loop.create_task(process_invoices())
wait_until_done = loop.create_task(work_queue.wait_until_done())
worker_starter_task = loop.create_task(worker_starter())
debit_notes_job = loop.create_task(process_debit_notes())
# Py38: find_offers_task.set_name('find_offers_task')

get_offers_deadline = datetime.now(timezone.utc) + self._conf.get_offers_timeout
get_done_task: Optional[asyncio.Task] = None
services.update(
{find_offers_task, process_invoices_job, wait_until_done, worker_starter_task,}
{
find_offers_task,
process_invoices_job,
wait_until_done,
worker_starter_task,
debit_notes_job,
}
)
cancelled = False

Expand Down Expand Up @@ -643,18 +694,18 @@ def _get_common_payment_platforms(self, proposal: rest.market.OfferProposal) ->
}
return req_platforms.intersection(prov_platforms)

def _allocation_for_invoice(self, invoice: rest.payment.Invoice) -> rest.payment.Allocation:
def _get_allocation(
self, item: Union[rest.payment.DebitNote, rest.payment.Invoice]
) -> rest.payment.Allocation:
try:
return next(
allocation
for allocation in self._budget_allocations
if allocation.payment_address == invoice.payer_addr
and allocation.payment_platform == invoice.payment_platform
if allocation.payment_address == item.payer_addr
and allocation.payment_platform == item.payment_platform
)
except:
raise RuntimeError(
f"No allocation for {invoice.payment_platform} {invoice.payer_addr}."
)
raise RuntimeError(f"No allocation for {item.payment_platform} {item.payer_addr}.")

async def __aenter__(self) -> "Executor":
stack = self._stack
Expand Down
10 changes: 8 additions & 2 deletions yapapi/executor/ctx.py
Expand Up @@ -241,7 +241,10 @@ def send_file(self, src_path: str, dst_path: str):
self._pending_steps.append(_SendFile(self._storage, src_path, dst_path))

def run(
self, cmd: str, *args: Iterable[str], env: Optional[Dict[str, str]] = None,
self,
cmd: str,
*args: Iterable[str],
env: Optional[Dict[str, str]] = None,
):
"""Schedule running a command.

Expand Down Expand Up @@ -311,7 +314,10 @@ def build(cls, mode=None, limit=None, fmt=None) -> "CaptureContext":

@classmethod
def _build(
cls, mode: CaptureMode, limit: Optional[int] = None, fmt: Optional[str] = None,
cls,
mode: CaptureMode,
limit: Optional[int] = None,
fmt: Optional[str] = None,
) -> "CaptureContext":
cap_fmt: Optional[CaptureFormat] = CaptureFormat(fmt) if fmt else None
return cls(mode=mode, fmt=cap_fmt, limit=limit)
Expand Down
6 changes: 6 additions & 0 deletions yapapi/executor/events.py
Expand Up @@ -127,6 +127,12 @@ class AgreementTerminated(AgreementEvent):
reason: dict


@dataclass
class DebitNoteReceived(AgreementEvent):
note_id: str
amount: str


@dataclass
class PaymentAccepted(AgreementEvent):
inv_id: str
Expand Down
3 changes: 2 additions & 1 deletion yapapi/executor/task.py
Expand Up @@ -30,7 +30,8 @@ class Task(Generic[TaskData, TaskResult]):
ids: ClassVar[Iterator[int]] = itertools.count(1)

def __init__(
self, data: TaskData,
self,
data: TaskData,
):
"""Create a new Task object.

Expand Down
1 change: 1 addition & 0 deletions yapapi/log.py
Expand Up @@ -120,6 +120,7 @@ def enable_default_logger(
events.AgreementConfirmed: "Agreement approved by provider",
events.AgreementRejected: "Agreement rejected by provider",
events.AgreementTerminated: "Agreement terminated",
events.DebitNoteReceived: "Debit note received",
events.PaymentAccepted: "Payment accepted", # by who?
events.PaymentFailed: "Payment failed",
events.PaymentPrepared: "Payment prepared",
Expand Down
3 changes: 2 additions & 1 deletion yapapi/props/__init__.py
Expand Up @@ -53,7 +53,8 @@ class Activity(Model):
default=None, metadata={"key": "golem.srv.comp.expiration"}
)
multi_activity: Optional[bool] = field(
default=None, metadata={"key": "golem.srv.caps.multi-activity"},
default=None,
metadata={"key": "golem.srv.caps.multi-activity"},
)
"""Whether client supports multi_activity (executing more than one activity per agreement).
"""
Expand Down
5 changes: 4 additions & 1 deletion yapapi/props/base.py
Expand Up @@ -98,7 +98,10 @@ def from_properties(cls: Type[ME], props: Props) -> ME:
initialized from the same dictionary and all models will only load their own data.
"""
field_map = dict(
(f.metadata["key"], _PyField(name=f.name, type=f.type, required=f.default is MISSING),)
(
f.metadata["key"],
_PyField(name=f.name, type=f.type, required=f.default is MISSING),
)
for f in fields(cls)
if "key" in f.metadata
)
Expand Down
38 changes: 35 additions & 3 deletions yapapi/rest/configuration.py
Expand Up @@ -52,6 +52,32 @@ def __init__(self, event_date=None, invoice_id=None, local_vars_configuration=No
ya_payment.models.InvoiceReceivedEvent = _InvoiceReceivedEventWithDate # type: ignore


class _DebitNoteReceivedEventWithDate(
ya_payment.models.debit_note_received_event.DebitNoteReceivedEvent
):
"""A more correct model for DebitNoteReceivedEvent message."""

openapi_types = {
"event_date": "datetime",
"debit_note_id": "str",
}

attribute_map = {
"event_date": "eventDate",
"debit_note_id": "debitNoteId",
}

def __init__(self, event_date=None, debit_note_id=None, local_vars_configuration=None):
super().__init__(
debit_note_id=debit_note_id, local_vars_configuration=local_vars_configuration
)
self.event_date = event_date


ya_payment.models.debit_note_received_event.DebitNoteReceivedEvent = _DebitNoteReceivedEventWithDate # type: ignore
ya_payment.models.DebitNoteReceivedEvent = _DebitNoteReceivedEventWithDate # type: ignore


class Configuration(object):
"""
REST API's setup and top-level access utility.
Expand Down Expand Up @@ -118,19 +144,25 @@ def market(self) -> ya_market.ApiClient:
"""Return a REST client for the Market API."""
cfg = ya_market.Configuration(host=self.market_url)
return ya_market.ApiClient(
configuration=cfg, header_name="authorization", header_value=f"Bearer {self.app_key}",
configuration=cfg,
header_name="authorization",
header_value=f"Bearer {self.app_key}",
)

def payment(self) -> ya_payment.ApiClient:
"""Return a REST client for the Payment API."""
cfg = ya_payment.Configuration(host=self.payment_url)
return ya_payment.ApiClient(
configuration=cfg, header_name="authorization", header_value=f"Bearer {self.app_key}",
configuration=cfg,
header_name="authorization",
header_value=f"Bearer {self.app_key}",
)

def activity(self) -> ya_activity.ApiClient:
"""Return a REST client for the Activity API."""
cfg = ya_activity.Configuration(host=self.activity_url)
return ya_activity.ApiClient(
configuration=cfg, header_name="authorization", header_value=f"Bearer {self.app_key}",
configuration=cfg,
header_name="authorization",
header_value=f"Bearer {self.app_key}",
)
13 changes: 9 additions & 4 deletions yapapi/rest/market.py
Expand Up @@ -62,7 +62,7 @@ async def details(self) -> AgreementDetails:

async def confirm(self) -> bool:
"""Sign and send the agreement to the provider and then wait for it to be approved.

:return: True if the agreement has been confirmed, False otherwise
"""
await self._api.confirm_agreement(self._id)
Expand All @@ -77,7 +77,8 @@ async def terminate(self, reason: dict) -> bool:

try:
await self._api.terminate_agreement(
self._id, request_body=reason,
self._id,
request_body=reason,
)
logger.debug("terminateAgreement(%s) returned successfully", self._id)
return True
Expand Down Expand Up @@ -135,7 +136,8 @@ async def respond(self, props: dict, constraints: str) -> str:
async def create_agreement(self, timeout=timedelta(hours=1)) -> Agreement:
"""Create an Agreement based on this Proposal."""
proposal = models.AgreementProposal(
proposal_id=self.id, valid_to=datetime.now(timezone.utc) + timeout,
proposal_id=self.id,
valid_to=datetime.now(timezone.utc) + timeout,
)
api: RequestorApi = self._subscription._api
agreement_id = await api.create_agreement(proposal)
Expand All @@ -154,7 +156,10 @@ class Subscription(object):
"""Mid-level interface to REST API's Subscription model."""

def __init__(
self, api: RequestorApi, subscription_id: str, _details: Optional[models.Demand] = None,
self,
api: RequestorApi,
subscription_id: str,
_details: Optional[models.Demand] = None,
):
self._api: RequestorApi = api
self._id: str = subscription_id
Expand Down