Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continuous distribution #545

Open
wants to merge 33 commits into
base: jean/economic-model-sigmoid-based
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
18f1784
removed legacy code and made sure peer list is kept after iterations
jeandemeusy Jul 4, 2024
c456480
Remove unused imports
jeandemeusy Jul 4, 2024
28dd0b5
better subgraph handling
jeandemeusy Jul 4, 2024
517ba2b
more cleanup
jeandemeusy Jul 5, 2024
ca4ec24
implementation of asyncio queues
jeandemeusy Jul 5, 2024
5ed9a82
asyncloop class and queue generating / consuming tasks
jeandemeusy Jul 8, 2024
00ea7a3
handling task generation properly
jeandemeusy Jul 8, 2024
ca074d0
few test and renaming
jeandemeusy Jul 8, 2024
9f2f50a
Asyncloop test
jeandemeusy Jul 8, 2024
53b399b
fix in Dockerfile
jeandemeusy Jul 8, 2024
8c221ab
Update ct-app/test/components/test_asyncloop.py
jeandemeusy Jul 8, 2024
8d00813
minor fixes
jeandemeusy Jul 8, 2024
c530cd7
minor fixes
jeandemeusy Jul 8, 2024
9a09e7c
db storing
jeandemeusy Jul 9, 2024
e67fdb4
fix message count bug, and account for message value
jeandemeusy Jul 10, 2024
b21a789
path to config file in helm chart
jeandemeusy Jul 10, 2024
f027b47
Merge branch 'jean/economic-model-sigmoid-based' into jean/continuous…
jeandemeusy Jul 10, 2024
8a8be09
Merge branch 'jean/economic-model-sigmoid-based' into jean/continuous…
jeandemeusy Jul 10, 2024
6d702e7
Merge branch 'jean/economic-model-sigmoid-based' into jean/continuous…
jeandemeusy Jul 10, 2024
38ed42e
imports refactor and inbox check
jeandemeusy Jul 11, 2024
cb83dab
fixes
jeandemeusy Jul 11, 2024
4714d5c
auto message parse / formating
jeandemeusy Jul 12, 2024
7701f1d
increase inbox capacity and prepare for staging tests
jeandemeusy Jul 12, 2024
80ca14a
fix delays
jeandemeusy Jul 12, 2024
b04fad4
increase load on staging
jeandemeusy Jul 12, 2024
0468ed8
reduce db access and metric
jeandemeusy Jul 14, 2024
f5a6d64
fix
jeandemeusy Jul 14, 2024
ff9a38e
message count metric and fixes
jeandemeusy Jul 14, 2024
e063e63
increase load on staging, use 0x1245 as inbox tag
jeandemeusy Jul 16, 2024
4a1b4f1
adjust staging load
jeandemeusy Jul 16, 2024
527e1ed
send message without waiting for result
jeandemeusy Jul 16, 2024
4572d83
increase staging inbox size
jeandemeusy Jul 16, 2024
f6a1e27
reduce load in staging
jeandemeusy Jul 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# cache
__pycache__/
.pytest_cache/
.ruff_cache/

# envs
.venv*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,43 @@
# =============================================================================
flags:
core:
healthcheck: 60
checkSubgraphURLs: 300
getTicketPrice: 120
aggregatePeers: 300
getTopologyData: 300
getSubgraphData: 300
getRegisteredNodes: 300
getNFTHolders: 300
getPeersRewards: 300
healthcheck: 10
rotateSubgraphs: 300
peersRewards: 300
ticketParameters: 120
jeandemeusy marked this conversation as resolved.
Show resolved Hide resolved

connectedPeers: 300
registeredNodes: 300
topology: 300
NFTHolders: 300

applyEconomicModel: 120
distributeRewards: 1

node:
healthcheck: 60
retrievePeers: 300
retrieveIncomingChannels: 600
retrieveOutgoingChannels: 600
retrieveBalances: 900

openChannels: 300
fundChannels: 300
closeOldChannels: 300
closePendingChannels: 1800
fundChannels: 300
closeIncomingChannels: ~
closeIncomingChannels: 'off'

jeandemeusy marked this conversation as resolved.
Show resolved Hide resolved
getTotalChannelFunds: 900

consume: 'on'

peer:
relayMessage: 'off'

# =============================================================================
#
# =============================================================================
economicModel:
minSafeAllowance: -1
NFTThreshold: 30000
intervals: 1800 # in seconds
winningProbability: 1

legacy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,42 @@
flags:
core:
healthcheck: 10
checkSubgraphURLs: 30
getTicketPrice: 30
aggregatePeers: 30
getTopologyData: 30
getSubgraphData: 30
getRegisteredNodes: 30
getNFTHolders: 30
getPeersRewards: 30
rotateSubgraphs: 30
peersRewards: 30
ticketParameters: 30

connectedPeers: 30
registeredNodes: 30
topology: 30
NFTHolders: 30

applyEconomicModel: 30
distributeRewards: 1

node:
healthcheck: 10
retrievePeers: 30
retrieveIncomingChannels: 30
retrieveOutgoingChannels: 30
retrieveBalances: 30

openChannels: 300
closeOldChannels: ~
closePendingChannels: ~
fundChannels: 300
closeIncomingChannels: ~
closeOldChannels: 'off'
closePendingChannels: 'off'
closeIncomingChannels: 'off'

getTotalChannelFunds: 300
consume: 'on'

peer:
relayMessage: 'on'

# =============================================================================
#
# =============================================================================
economicModel:
minSafeAllowance: -1
NFTThreshold: ~
intervals: 1800 # in seconds
winningProbability: 1

legacy:
Expand Down
21 changes: 21 additions & 0 deletions ct-app/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[report]
exclude_also =
def __repr__
def __str__
if self\.debug
self.debug
self.info
self.warning
self.error

if 0:
if __name__ == .__main__.:

def print_prefix


omit =
test/*
hoprd_api.py
graphql_providers.py
message_queue.py
10 changes: 10 additions & 0 deletions ct-app/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
PGHOST="<PGHOST>"
PGPORT="<PGPORT>"
PGUSER="<PGUSER"
PGPASSWORD="<PGPASSWORD"
PGDATABASE="<PGDATABASE>"

SUBGRAPH_API_KEY="<SUBGRAPH_API_KEY>"

HOST_FORMAT='<HOST_FORMAT>'
TOKEN='<TOKEN>'
2 changes: 1 addition & 1 deletion ct-app/.envrc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use flake
use flake

VIRTUAL_ENV="$PWD/.venv"
export VIRTUAL_ENV
Expand Down
28 changes: 6 additions & 22 deletions ct-app/core/__main__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import asyncio
from signal import SIGINT, SIGTERM

import click
import yaml
from prometheus_client import start_http_server

from .components.asyncloop import AsyncLoop
from .components.parameters import Parameters
from .components.utils import Utils
from .core import Core
Expand All @@ -20,16 +18,12 @@ def main(configfile: str = None):
# import envvars to params, such as self.params.subgraph.deployer_key
params = Parameters()
params.parse(config)
params.from_env("SUBGRAPH_", "PG")
params.overrides("OVERRIDE_")
params.from_env("SUBGRAPH", "PG")
params.overrides("OVERRIDE")

# create the core and nodes instances
instance = Core()
nodes = Node.fromAddressAndKeyLists(
*Utils.nodesAddresses("NODE_ADDRESS_", "NODE_KEY_")
)

instance.post_init(nodes, params)
nodes = Node.fromCredentials(*Utils.nodesCredentials("NODE_ADDRESS", "NODE_KEY"))
instance = Core(nodes, params)

# start the prometheus client
try:
Expand All @@ -39,17 +33,7 @@ def main(configfile: str = None):
else:
instance.info("Prometheus client started on port 8080")

loop = asyncio.new_event_loop()
loop.add_signal_handler(SIGINT, instance.stop)
loop.add_signal_handler(SIGTERM, instance.stop)

try:
loop.run_until_complete(instance.start())
except asyncio.CancelledError:
instance.error("Stopping the instance...")
finally:
instance.stop()
loop.close()
AsyncLoop.run(instance.start)


if __name__ == "__main__":
Expand Down
52 changes: 52 additions & 0 deletions ct-app/core/components/asyncloop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import asyncio
from signal import SIGINT, SIGTERM
from typing import Callable

from .baseclass import Base
from .singleton import Singleton


class AsyncLoop(Base, metaclass=Singleton):
def __init__(self):
self.loop = asyncio.new_event_loop()
self.tasks = set[asyncio.Task]()

self.loop.add_signal_handler(SIGINT, self.stop)
self.loop.add_signal_handler(SIGTERM, self.stop)

@classmethod
def hasRunningTasks(cls) -> bool:
return bool(cls().tasks)

@classmethod
def run(cls, process: Callable):
try:
cls().loop.run_until_complete(process())
except asyncio.CancelledError:
cls().error("Stopping the instance...")
finally:
cls().stop()

@classmethod
def update(cls, tasks: set[Callable]):
for task in tasks:
cls().add(task)

@classmethod
def add(cls, callback: Callable):
task = asyncio.ensure_future(callback())
cls().tasks.add(task)

@classmethod
async def gather(cls):
await asyncio.gather(*cls().tasks)

@classmethod
def stop(cls):
for task in cls().tasks:
task.add_done_callback(cls().tasks.discard)
task.cancel()

@property
def print_prefix(self) -> str:
return "asyncloop"
24 changes: 16 additions & 8 deletions ct-app/core/components/channelstatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,22 @@ class ChannelStatus(Enum):
PendingToClose = "PendingToClose"
Closed = "Closed"

@classmethod
def isPending(cls, value: str):
return value == cls.PendingToClose.value
@property
def isPending(self):
return self == self.PendingToClose

@classmethod
def isOpen(cls, value: str):
return value == cls.Open.value
@property
def isOpen(self):
return self == self.Open

@property
def isClosed(self):
return self == self.Closed

@classmethod
def isClosed(cls, value: str):
return value == cls.Closed.value
def fromString(cls, value: str):
for status in cls:
if status.value == value:
return status

return None
24 changes: 16 additions & 8 deletions ct-app/core/components/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ async def wrapper(self, *args, **kwargs):
raise AttributeError(f"Feature `{func.__name__}` not in config file")

index = params_clean.index(func_name_clean)
if getattr(class_flags, params_raw[index]) is None:
self.error(f"Feature `{params_raw[index]}` not yet available")
feature = params_raw[index]
flag = getattr(class_flags, feature)

if flag is None or flag == "off":
self.error(f"Feature `{feature}` not yet available")
return

return await func(self, *args, **kwargs)
Expand Down Expand Up @@ -75,19 +78,24 @@ async def wrapper(self, *args, **kwargs):
index = params_clean.index(func_name_clean)
delay = getattr(class_flags, params_raw[index])

if delay is not None:
if delay == "on":
delay = 0
if delay == "off":
delay = None

if delay == 0:
self.info(f"Running `{params_raw[index]}` continuously")
elif delay is not None:
self.info(f"Running `{params_raw[index]}` every {delay} seconds")

while self.started:
while self.running:
if message:
self.feature(message)
await func(self, *args, **kwargs)

if delay == 0:
if delay is None:
break

if delay is not None:
await asyncio.sleep(delay)
await asyncio.sleep(delay)

return wrapper

Expand Down
7 changes: 0 additions & 7 deletions ct-app/core/components/environment_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ class EnvironmentUtils(Base):
def print_prefix(self) -> str:
return "EnvUtils"

@classmethod
def envvar(cls, var_name: str, default: Any = None, type: type = str):
if var_name in environ:
return type(environ[var_name])
else:
return default

@classmethod
def envvarWithPrefix(cls, prefix: str, type=str) -> dict[str, Any]:
jeandemeusy marked this conversation as resolved.
Show resolved Hide resolved
var_dict = {
Expand Down
3 changes: 2 additions & 1 deletion ct-app/core/components/graphql_providers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from pathlib import Path
from typing import Union

from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
Expand All @@ -21,7 +22,7 @@ def __init__(self, url: str):
self._default_key = None

#### PRIVATE METHODS ####
def _load_query(self, path: str or Path) -> DocumentNode:
def _load_query(self, path: Union[str, Path]) -> DocumentNode:
"""
Loads a graphql query from a file.
:param path: Path to the file. The path must be relative to the ct-app folder.
Expand Down
9 changes: 8 additions & 1 deletion ct-app/core/components/hoprd_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from urllib3.exceptions import MaxRetryError

from .baseclass import Base
from .channelstatus import ChannelStatus

MESSAGE_TAG = 800

Expand Down Expand Up @@ -233,7 +234,13 @@ async def all_channels(self, include_closed: bool):
including_closed="true" if include_closed else "false",
)

return response if is_ok else []
if not is_ok:
return []

for channel in response.all:
channel.status = ChannelStatus.fromString(channel.status)

return response

async def peers(
self,
Expand Down
Loading
Loading