diff --git a/logicnet/protocol.py b/logicnet/protocol.py index aeae615e..d4494bce 100644 --- a/logicnet/protocol.py +++ b/logicnet/protocol.py @@ -60,4 +60,33 @@ def deserialize_response(self): return { "logic_answer": self.logic_answer, "logic_reasoning": self.logic_reasoning, - } \ No newline at end of file + } + + +class LogicRequest(pydantic.BaseModel): + """ + Logic Synapse for the LogicNet protocol + """ + + # MINER NEED TO FILL THIS INFORMATION + logic_question: str = pydantic.Field( + "", + description="Logic question to be answered by miner. It can be noised question from the raw logic question from synthetic loop.", + ) + logic_answer: Union[str, object] = pydantic.Field( + "", description="Short logic answer as a summary of the logic reasoning." + ) + logic_reasoning: str = pydantic.Field( + "", + description="Reasoning when answering the logic question", + ) + + # SYNAPSE INFORMATION + category: str = pydantic.Field( + "", + description="One of the categories in the Validator main.", + ) + timeout: int = pydantic.Field( + 64, + description="Timeout for the miner to answer the logic question.", + ) diff --git a/logicnet/utils/config.py b/logicnet/utils/config.py index a8e195e2..af1dd1dc 100644 --- a/logicnet/utils/config.py +++ b/logicnet/utils/config.py @@ -119,7 +119,14 @@ def add_args(cls, parser): "--proxy.proxy_client_url", type=str, help="The url initialize credentials for proxy.", - default="http://logicapi.aitprotocol.ai/proxy_client", + default="https://logicnet.aitprotocol.ai/proxy_client", + ) + + parser.add_argument( + "--storage.storage_url", + type=str, + help="The url initialize to store miner's information.", + default="https://logicnet.aitprotocol.ai/proxy_client/store_miner_information", ) parser.add_argument( diff --git a/logicnet/validator/miner_manager.py b/logicnet/validator/miner_manager.py index a1e211ed..9e941887 100644 --- a/logicnet/validator/miner_manager.py +++ b/logicnet/validator/miner_manager.py @@ -41,6 +41,8 @@ def __repr__(self): return str(self.to_dict()) + "\n" def to_dict(self): + # Round score to 4 decimal places + self.scores = [round(score, 3) for score in self.scores][-NO_OF_RECENT_SCORES:] return { "category": self.category, "scores": self.scores, @@ -56,6 +58,9 @@ def __init__(self, validator): self.all_uids = [int(uid.item()) for uid in self.validator.metagraph.uids] self.all_uids_info = {uid: MinerInfo() for uid in self.all_uids} + def to_dict(self): + return {uid: info.to_dict() for uid, info in self.all_uids_info.items()} + def get_miner_info(self): """ QUERY MINER's INFORMATION SYNAPSE diff --git a/neurons/validator/validator.py b/neurons/validator/validator.py index 0251e70b..adea7e52 100644 --- a/neurons/validator/validator.py +++ b/neurons/validator/validator.py @@ -9,6 +9,8 @@ import traceback import threading from neurons.validator.core.serving_queue import QueryQueue +import requests + def init_category(config=None): category = { @@ -30,6 +32,7 @@ def init_category(config=None): } return category + class Validator(BaseValidatorNeuron): def __init__(self, config=None): """ @@ -62,7 +65,7 @@ def forward(self): Query miners by batched from the serving queue then process challenge-generating -> querying -> rewarding in background by threads DEFAULT: 16 miners per batch, 600 seconds per loop. """ - + self.store_miner_infomation() bt.logging.info("Updating available models & uids") async_batch_size = self.config.async_batch_size loop_base_time = self.config.loop_base_time # default is 600 seconds @@ -98,6 +101,7 @@ def forward(self): "Loop completed, uids info:\n", str(self.miner_manager.all_uids_info).replace("},", "},\n"), ) + self.store_miner_infomation() actual_time_taken = time.time() - loop_start @@ -243,6 +247,25 @@ def load_state(self): self.step = 0 bt.logging.info("Could not find previously saved state.", e) + def store_miner_infomation(self): + miner_informations = self.miner_manager.to_dict() + + def _post_miner_informations(miner_informations): + requests.post( + url=self.config.storage.storage_url, + json={ + "miner_information": miner_informations, + "validator_uid": int(self.uid), + }, + ) + + thread = threading.Thread( + target=_post_miner_informations, + args=(miner_informations,), + ) + thread.start() + + # The main function parses the configuration and runs the validator. if __name__ == "__main__": with Validator() as validator: diff --git a/neurons/validator/validator_proxy.py b/neurons/validator/validator_proxy.py index 96988871..dcc2112f 100644 --- a/neurons/validator/validator_proxy.py +++ b/neurons/validator/validator_proxy.py @@ -1,6 +1,5 @@ from fastapi import FastAPI, HTTPException, Depends from pydantic import BaseModel -from typing import Optional from concurrent.futures import ThreadPoolExecutor import uvicorn from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey @@ -14,10 +13,15 @@ import httpx import threading + class OrganicRequest(BaseModel): authorization: str - payload: Optional[logicnet.protocol.LogicSynapse] = None - re_check: bool = False + synapse_request: logicnet.protocol.LogicRequest + + +class Recheck(BaseModel): + authorization: str + class ValidatorProxy: """ @@ -39,6 +43,12 @@ def __init__( methods=["POST"], dependencies=[Depends(self.get_self)], ) + self.app.add_api_route( + "/recheck", + self.re_check, + methods=["POST"], + dependencies=[Depends(self.get_self)], + ) self.loop = asyncio.get_event_loop() if self.validator.config.proxy.port: self.start_server() @@ -48,11 +58,7 @@ def get_credentials(self): response = client.post( f"{self.validator.config.proxy.proxy_client_url}/get_credentials", json={ - "postfix": ( - f":{self.validator.config.proxy.port}/validator_proxy" - if self.validator.config.proxy.port - else "" - ), + "port": self.validator.config.proxy.port, "uid": self.validator.uid, }, ) @@ -78,8 +84,8 @@ def start_server(self): ) def authenticate_token(self, public_key_bytes): - public_key_bytes = base64.b64decode(public_key_bytes) try: + public_key_bytes = base64.b64decode(public_key_bytes) self.verify_credentials(public_key_bytes) bt.logging.info("Successfully authenticated token") return public_key_bytes @@ -90,9 +96,13 @@ def authenticate_token(self, public_key_bytes): status_code=401, detail="Error getting authentication token" ) - def organic_reward( - self, synapse, response, uid, rewarder, timeout - ): + def re_check(self, data: Recheck): + self.authenticate_token(data.authorization) + bt.logging.info("Rechecking validators") + self.get_credentials() + return {"message": "done"} + + def organic_reward(self, synapse, response, uid, rewarder, timeout): if callable(rewarder): uids, rewards = rewarder([uid], [response], synapse) else: @@ -115,12 +125,8 @@ def organic_reward( async def forward(self, data: OrganicRequest): self.authenticate_token(data.authorization) - synapse = data.payload - if data.re_check: - bt.logging.info("Rechecking validators") - self.get_credentials() - return {"message": "done"} bt.logging.info("Received an organic request!") + synapse = logicnet.protocol.LogicSynapse(**data.synapse_request.dict()) category = synapse.category category_config = self.validator.categories[category] @@ -135,8 +141,11 @@ async def forward(self, data: OrganicRequest): output = None for uid, should_reward in self.validator.query_queue.get_query_for_proxy( category - ): - should_reward = random.random() < self.validator.config.proxy.checking_probability or should_reward + ): + should_reward = ( + random.random() < self.validator.config.proxy.checking_probability + or should_reward + ) bt.logging.info( f"Forwarding request to miner {uid} with recent scores: {self.validator.miner_manager.all_uids_info[uid].scores}" ) @@ -168,4 +177,4 @@ async def forward(self, data: OrganicRequest): return HTTPException(status_code=500, detail="No valid response received") async def get_self(self): - return self \ No newline at end of file + return self