Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion logicnet/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,33 @@ def deserialize_response(self):
return {
"logic_answer": self.logic_answer,
"logic_reasoning": self.logic_reasoning,
}
}


class LogicRequest(pydantic.BaseModel):
"""
Logic Synapse for the LogicNet protocol
"""

# MINER NEED TO FILL THIS INFORMATION
logic_question: str = pydantic.Field(
"",
description="Logic question to be answered by miner. It can be noised question from the raw logic question from synthetic loop.",
)
logic_answer: Union[str, object] = pydantic.Field(
"", description="Short logic answer as a summary of the logic reasoning."
)
logic_reasoning: str = pydantic.Field(
"",
description="Reasoning when answering the logic question",
)

# SYNAPSE INFORMATION
category: str = pydantic.Field(
"",
description="One of the categories in the Validator main.",
)
timeout: int = pydantic.Field(
64,
description="Timeout for the miner to answer the logic question.",
)
9 changes: 8 additions & 1 deletion logicnet/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,14 @@ def add_args(cls, parser):
"--proxy.proxy_client_url",
type=str,
help="The url initialize credentials for proxy.",
default="http://logicapi.aitprotocol.ai/proxy_client",
default="https://logicnet.aitprotocol.ai/proxy_client",
)

parser.add_argument(
"--storage.storage_url",
type=str,
help="The url initialize to store miner's information.",
default="https://logicnet.aitprotocol.ai/proxy_client/store_miner_information",
)

parser.add_argument(
Expand Down
5 changes: 5 additions & 0 deletions logicnet/validator/miner_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def __repr__(self):
return str(self.to_dict()) + "\n"

def to_dict(self):
# Round score to 4 decimal places
self.scores = [round(score, 3) for score in self.scores][-NO_OF_RECENT_SCORES:]
return {
"category": self.category,
"scores": self.scores,
Expand All @@ -56,6 +58,9 @@ def __init__(self, validator):
self.all_uids = [int(uid.item()) for uid in self.validator.metagraph.uids]
self.all_uids_info = {uid: MinerInfo() for uid in self.all_uids}

def to_dict(self):
return {uid: info.to_dict() for uid, info in self.all_uids_info.items()}

def get_miner_info(self):
"""
QUERY MINER's INFORMATION SYNAPSE
Expand Down
25 changes: 24 additions & 1 deletion neurons/validator/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import traceback
import threading
from neurons.validator.core.serving_queue import QueryQueue
import requests


def init_category(config=None):
category = {
Expand All @@ -30,6 +32,7 @@ def init_category(config=None):
}
return category


class Validator(BaseValidatorNeuron):
def __init__(self, config=None):
"""
Expand Down Expand Up @@ -62,7 +65,7 @@ def forward(self):
Query miners by batched from the serving queue then process challenge-generating -> querying -> rewarding in background by threads
DEFAULT: 16 miners per batch, 600 seconds per loop.
"""

self.store_miner_infomation()
bt.logging.info("Updating available models & uids")
async_batch_size = self.config.async_batch_size
loop_base_time = self.config.loop_base_time # default is 600 seconds
Expand Down Expand Up @@ -98,6 +101,7 @@ def forward(self):
"Loop completed, uids info:\n",
str(self.miner_manager.all_uids_info).replace("},", "},\n"),
)
self.store_miner_infomation()

actual_time_taken = time.time() - loop_start

Expand Down Expand Up @@ -243,6 +247,25 @@ def load_state(self):
self.step = 0
bt.logging.info("Could not find previously saved state.", e)

def store_miner_infomation(self):
miner_informations = self.miner_manager.to_dict()

def _post_miner_informations(miner_informations):
requests.post(
url=self.config.storage.storage_url,
json={
"miner_information": miner_informations,
"validator_uid": int(self.uid),
},
)

thread = threading.Thread(
target=_post_miner_informations,
args=(miner_informations,),
)
thread.start()


# The main function parses the configuration and runs the validator.
if __name__ == "__main__":
with Validator() as validator:
Expand Down
49 changes: 29 additions & 20 deletions neurons/validator/validator_proxy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional
from concurrent.futures import ThreadPoolExecutor
import uvicorn
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
Expand All @@ -14,10 +13,15 @@
import httpx
import threading


class OrganicRequest(BaseModel):
authorization: str
payload: Optional[logicnet.protocol.LogicSynapse] = None
re_check: bool = False
synapse_request: logicnet.protocol.LogicRequest


class Recheck(BaseModel):
authorization: str


class ValidatorProxy:
"""
Expand All @@ -39,6 +43,12 @@ def __init__(
methods=["POST"],
dependencies=[Depends(self.get_self)],
)
self.app.add_api_route(
"/recheck",
self.re_check,
methods=["POST"],
dependencies=[Depends(self.get_self)],
)
self.loop = asyncio.get_event_loop()
if self.validator.config.proxy.port:
self.start_server()
Expand All @@ -48,11 +58,7 @@ def get_credentials(self):
response = client.post(
f"{self.validator.config.proxy.proxy_client_url}/get_credentials",
json={
"postfix": (
f":{self.validator.config.proxy.port}/validator_proxy"
if self.validator.config.proxy.port
else ""
),
"port": self.validator.config.proxy.port,
"uid": self.validator.uid,
},
)
Expand All @@ -78,8 +84,8 @@ def start_server(self):
)

def authenticate_token(self, public_key_bytes):
public_key_bytes = base64.b64decode(public_key_bytes)
try:
public_key_bytes = base64.b64decode(public_key_bytes)
self.verify_credentials(public_key_bytes)
bt.logging.info("Successfully authenticated token")
return public_key_bytes
Expand All @@ -90,9 +96,13 @@ def authenticate_token(self, public_key_bytes):
status_code=401, detail="Error getting authentication token"
)

def organic_reward(
self, synapse, response, uid, rewarder, timeout
):
def re_check(self, data: Recheck):
self.authenticate_token(data.authorization)
bt.logging.info("Rechecking validators")
self.get_credentials()
return {"message": "done"}

def organic_reward(self, synapse, response, uid, rewarder, timeout):
if callable(rewarder):
uids, rewards = rewarder([uid], [response], synapse)
else:
Expand All @@ -115,12 +125,8 @@ def organic_reward(

async def forward(self, data: OrganicRequest):
self.authenticate_token(data.authorization)
synapse = data.payload
if data.re_check:
bt.logging.info("Rechecking validators")
self.get_credentials()
return {"message": "done"}
bt.logging.info("Received an organic request!")
synapse = logicnet.protocol.LogicSynapse(**data.synapse_request.dict())

category = synapse.category
category_config = self.validator.categories[category]
Expand All @@ -135,8 +141,11 @@ async def forward(self, data: OrganicRequest):
output = None
for uid, should_reward in self.validator.query_queue.get_query_for_proxy(
category
):
should_reward = random.random() < self.validator.config.proxy.checking_probability or should_reward
):
should_reward = (
random.random() < self.validator.config.proxy.checking_probability
or should_reward
)
bt.logging.info(
f"Forwarding request to miner {uid} with recent scores: {self.validator.miner_manager.all_uids_info[uid].scores}"
)
Expand Down Expand Up @@ -168,4 +177,4 @@ async def forward(self, data: OrganicRequest):
return HTTPException(status_code=500, detail="No valid response received")

async def get_self(self):
return self
return self