diff --git a/build_scripts/installer-version.py b/build_scripts/installer-version.py index d9d71e02e..e7515d81c 100644 --- a/build_scripts/installer-version.py +++ b/build_scripts/installer-version.py @@ -8,6 +8,9 @@ def main(): windows = len(sys.argv) > 1 and "win" in sys.argv[1] # Special case windows to 0.1.6225 + print("0.1.4") + return + scm_full_version = get_version(root="..", relative_to=__file__) # scm_full_version = "1.0.5.dev22" os.environ["SCM_VERSION"] = scm_full_version diff --git a/flax/farmer/farmer.py b/flax/farmer/farmer.py index f4983c1d5..d4d5a59b4 100644 --- a/flax/farmer/farmer.py +++ b/flax/farmer/farmer.py @@ -2,6 +2,7 @@ import json import logging import time +from asyncio import sleep from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple import traceback @@ -12,12 +13,16 @@ import flax.server.ws_connection as ws # lgtm [py/import-and-import-from] from flax.consensus.coinbase import create_puzzlehash_for_pk from flax.consensus.constants import ConsensusConstants +from flax.consensus.pot_iterations import calculate_sp_interval_iters from flax.daemon.keychain_proxy import ( KeychainProxy, KeychainProxyConnectionFailure, connect_to_keychain_and_validate, wrap_local_keychain, ) +from flax.farmer.pooling.constants import constants +from flax.farmer.pooling.og_pool_state import OgPoolState +from flax.farmer.pooling.pool_api_client import PoolApiClient from flax.pools.pool_config import PoolWalletConfig, load_pool_config from flax.protocols import farmer_protocol, harvester_protocol from flax.protocols.pool_protocol import ( @@ -38,7 +43,7 @@ from flax.ssl.create_ssl import get_mozilla_ca_crt from flax.types.blockchain_format.proof_of_space import ProofOfSpace from flax.types.blockchain_format.sized_bytes import bytes32 -from flax.util.bech32m import decode_puzzle_hash +from flax.util.bech32m import decode_puzzle_hash, encode_puzzle_hash from flax.util.byte_types import hexstr_to_bytes from flax.util.config import load_config, save_config, config_path_for_filename from flax.util.hash import std_hash @@ -182,10 +187,58 @@ async def setup_keys(self): self.harvester_cache: Dict[str, Dict[str, HarvesterCacheEntry]] = {} + # OG Pooling setup + self.pool_url = self.config.get("pool_url") + self.pool_payout_address = self.config.get("pool_payout_address") + self.pool_sub_slot_iters = constants.get("POOL_SUB_SLOT_ITERS") + self.iters_limit = calculate_sp_interval_iters(self.constants, self.pool_sub_slot_iters) + self.pool_minimum_difficulty: uint64 = uint64(1) + self.og_pool_state: OgPoolState = OgPoolState(difficulty=self.pool_minimum_difficulty) + self.pool_var_diff_target_in_seconds = 5 * 60 + self.pool_reward_target = self.pool_target + self.adjust_pool_difficulties_task: Optional[asyncio.Task] = None + self.check_pool_reward_target_task: Optional[asyncio.Task] = None + + def is_pooling_enabled(self): + return self.pool_url is not None and self.pool_payout_address is not None + async def _start(self): await self.setup_keys() self.update_pool_state_task = asyncio.create_task(self._periodically_update_pool_state_task()) self.cache_clear_task = asyncio.create_task(self._periodically_clear_cache_and_refresh_task()) + if not self.is_pooling_enabled(): + self.log.info(f"Not OG pooling as 'pool_payout_address' and/or 'pool_url' are missing in your config") + return + self.pool_api_client = PoolApiClient(self.pool_url) + await self.initialize_pooling() + self.adjust_pool_difficulty_task = asyncio.create_task(self._periodically_adjust_pool_difficulty_task()) + self.check_pool_reward_target_task = asyncio.create_task(self._periodically_check_pool_reward_target_task()) + + async def initialize_pooling(self): + pool_info: Dict = {} + has_pool_info = False + while not has_pool_info: + try: + pool_info = await self.pool_api_client.get_pool_info() + has_pool_info = True + except Exception as e: + self.log.error(f"Error retrieving OG pool info: {e}") + await sleep(5) + + pool_name = pool_info["name"] + self.log.info(f"Connected to OG pool {pool_name}") + self.pool_var_diff_target_in_seconds = pool_info["var_diff_target_in_seconds"] + + self.pool_minimum_difficulty = uint64(pool_info["minimum_difficulty"]) + self.og_pool_state.difficulty = self.pool_minimum_difficulty + + pool_target = bytes.fromhex(pool_info["target_puzzle_hash"][2:]) + assert len(pool_target) == 32 + self.pool_reward_target = pool_target + address_prefix = self.config["network_overrides"]["config"][self.config["selected_network"]]["address_prefix"] + pool_target_encoded = encode_puzzle_hash(pool_target, address_prefix) + if self.pool_target is not pool_target or self.pool_target_encoded is not pool_target_encoded: + self.set_reward_targets(farmer_target_encoded=None, pool_target_encoded=pool_target_encoded) def _close(self): self._shut_down = True @@ -193,6 +246,10 @@ def _close(self): async def _await_closed(self): await self.cache_clear_task await self.update_pool_state_task + if self.adjust_pool_difficulty_task is not None: + await self.adjust_pool_difficulty_task + if self.check_pool_reward_target_task is not None: + await self.check_pool_reward_target_task def _set_state_changed_callback(self, callback: Callable): self.state_changed_callback = callback @@ -717,3 +774,46 @@ async def _periodically_clear_cache_and_refresh_task(self): log.error(f"_periodically_clear_cache_and_refresh_task failed: {traceback.format_exc()}") await asyncio.sleep(1) + + async def _periodically_adjust_pool_difficulty_task(self): + time_slept = 0 + while not self._shut_down: + # Sleep in 1 sec intervals to quickly exit outer loop, but effectively sleep 60 sec between actual code runs + await sleep(1) + time_slept += 1 + if time_slept < 60: + continue + time_slept = 0 + if (time.time() - self.og_pool_state.last_partial_submit_timestamp) < self.pool_var_diff_target_in_seconds: + continue + diff_since_last_partial_submit_in_seconds = time.time() - self.og_pool_state.last_partial_submit_timestamp + missing_partial_submits = int( + diff_since_last_partial_submit_in_seconds // self.pool_var_diff_target_in_seconds) + new_difficulty = uint64(max( + (self.og_pool_state.difficulty - (missing_partial_submits * 2)), + self.pool_minimum_difficulty + )) + if new_difficulty == self.og_pool_state.difficulty: + continue + old_difficulty = self.og_pool_state.difficulty + self.og_pool_state.difficulty = new_difficulty + log.info( + f"Lowered the OG pool difficulty from {old_difficulty} to " + f"{new_difficulty} due to no partial submits within the last " + f"{int(round(diff_since_last_partial_submit_in_seconds))} seconds" + ) + + async def _periodically_check_pool_reward_target_task(self): + time_slept = 0 + while not self._shut_down: + # Sleep in 1 sec intervals to quickly exit outer loop, but effectively sleep 5 min between actual code runs + await sleep(1) + time_slept += 1 + if time_slept < 5 * 60: + continue + time_slept = 0 + if self.pool_target is self.pool_reward_target: + continue + address_prefix = self.config["network_overrides"]["config"][self.config["selected_network"]]["address_prefix"] + pool_target_encoded = encode_puzzle_hash(self.pool_reward_target, address_prefix) + self.set_reward_targets(farmer_target_encoded=None, pool_target_encoded=pool_target_encoded) diff --git a/flax/farmer/farmer_api.py b/flax/farmer/farmer_api.py index fc081bf70..66c1bc449 100644 --- a/flax/farmer/farmer_api.py +++ b/flax/farmer/farmer_api.py @@ -3,12 +3,13 @@ from typing import Callable, Optional, List, Any, Dict, Tuple import aiohttp -from blspy import AugSchemeMPL, G2Element, PrivateKey +from blspy import AugSchemeMPL, G2Element, G1Element, PrivateKey import flax.server.ws_connection as ws from flax.consensus.network_type import NetworkType from flax.consensus.pot_iterations import calculate_iterations_quality, calculate_sp_interval_iters from flax.farmer.farmer import Farmer +from flax.farmer.pooling.og_pool_protocol import PartialPayload, SubmitPartial from flax.protocols import farmer_protocol, harvester_protocol from flax.protocols.harvester_protocol import PoolDifficulty from flax.protocols.pool_protocol import ( @@ -23,6 +24,7 @@ from flax.ssl.create_ssl import get_mozilla_ca_crt from flax.types.blockchain_format.pool_target import PoolTarget from flax.types.blockchain_format.proof_of_space import ProofOfSpace +from flax.types.blockchain_format.sized_bytes import bytes32 from flax.util.api_decorators import api_request, peer_required from flax.util.ints import uint32, uint64 @@ -259,9 +261,17 @@ async def new_proof_of_space( self.farmer.log.error(f"Error sending partial to {pool_url}, {resp.status}") except Exception as e: self.farmer.log.error(f"Error connecting to pool: {e}") - return - return + pool_public_key = new_proof_of_space.proof.pool_public_key + if pool_public_key is not None and self.farmer.is_pooling_enabled(): + await self.process_new_proof_of_space_for_pool( + new_proof_of_space, + peer, + pool_public_key, + computed_quality_string + ) + + return @api_request async def respond_signatures(self, response: harvester_protocol.RespondSignatures): @@ -441,10 +451,15 @@ async def new_signage_point(self, new_signage_point: farmer_protocol.NewSignageP p2_singleton_puzzle_hash, ) ) + difficulty = new_signage_point.difficulty + sub_slot_iters = new_signage_point.sub_slot_iters + if self.farmer.is_pooling_enabled(): + sub_slot_iters = self.farmer.pool_sub_slot_iters + difficulty = self.farmer.og_pool_state.difficulty message = harvester_protocol.NewSignagePointHarvester( new_signage_point.challenge_hash, - new_signage_point.difficulty, - new_signage_point.sub_slot_iters, + difficulty, + sub_slot_iters, new_signage_point.signage_point_index, new_signage_point.challenge_chain_sp, pool_difficulties, @@ -509,6 +524,94 @@ async def farming_info(self, request: farmer_protocol.FarmingInfo): }, ) + async def process_new_proof_of_space_for_pool( + self, + new_proof_of_space: harvester_protocol.NewProofOfSpace, + peer: ws.WSFlaxConnection, + pool_public_key: G1Element, + computed_quality_string: bytes32 + ): + og_pool_state = self.farmer.og_pool_state + + # Otherwise, send the proof of space to the pool + # When we win a block, we also send the partial to the pool + required_iters = calculate_iterations_quality( + self.farmer.constants.DIFFICULTY_CONSTANT_FACTOR, + computed_quality_string, + new_proof_of_space.proof.size, + og_pool_state.difficulty, + new_proof_of_space.sp_hash, + ) + if required_iters >= self.farmer.iters_limit: + self.farmer.log.info( + f"Proof of space not good enough for OG pool difficulty of {og_pool_state.difficulty}" + ) + return + + # Submit partial to pool + is_eos = new_proof_of_space.signage_point_index == 0 + payload = PartialPayload( + new_proof_of_space.proof, + new_proof_of_space.sp_hash, + is_eos, + self.farmer.pool_payout_address + ) + + # The plot key is 2/2 so we need the harvester's half of the signature + m_to_sign = payload.get_hash() + request = harvester_protocol.RequestSignatures( + new_proof_of_space.plot_identifier, + new_proof_of_space.challenge_hash, + new_proof_of_space.sp_hash, + [m_to_sign], + ) + response: Any = await peer.request_signatures(request) + if not isinstance(response, harvester_protocol.RespondSignatures): + self.farmer.log.error(f"Invalid response from harvester: {response}") + return + + assert len(response.message_signatures) == 1 + + plot_signature: Optional[G2Element] = None + for sk in self.farmer.get_private_keys(): + pk = sk.get_g1() + if pk == response.farmer_pk: + agg_pk = ProofOfSpace.generate_plot_public_key(response.local_pk, pk) + assert agg_pk == new_proof_of_space.proof.plot_public_key + sig_farmer = AugSchemeMPL.sign(sk, m_to_sign, agg_pk) + plot_signature = AugSchemeMPL.aggregate([sig_farmer, response.message_signatures[0][1]]) + assert AugSchemeMPL.verify(agg_pk, m_to_sign, plot_signature) + pool_sk = self.farmer.pool_sks_map[bytes(pool_public_key)] + authentication_signature = AugSchemeMPL.sign(pool_sk, m_to_sign) + + assert plot_signature is not None + agg_sig: G2Element = AugSchemeMPL.aggregate([plot_signature, authentication_signature]) + + submit_partial = SubmitPartial(payload, agg_sig, og_pool_state.difficulty) + self.farmer.log.debug("Submitting partial to OG pool ..") + og_pool_state.last_partial_submit_timestamp = time.time() + submit_partial_response: Dict + try: + submit_partial_response = await self.farmer.pool_api_client.submit_partial(submit_partial) + except Exception as e: + self.farmer.log.error(f"Error submitting partial to OG pool: {e}") + return + self.farmer.log.debug(f"OG pool response: {submit_partial_response}") + if "error_code" in submit_partial_response: + if submit_partial_response["error_code"] == 5: + self.farmer.log.info( + "Local OG pool difficulty too low, adjusting to OG pool difficulty " + f"({submit_partial_response['current_difficulty']})" + ) + og_pool_state.difficulty = uint64(submit_partial_response["current_difficulty"]) + else: + self.farmer.log.error( + f"Error in OG pooling: {submit_partial_response['error_code'], submit_partial_response['error_message']}" + ) + else: + self.farmer.log.info("The partial submitted to the OG pool was accepted") + og_pool_state.difficulty = uint64(submit_partial_response["current_difficulty"]) + @api_request @peer_required async def respond_plots(self, _: harvester_protocol.RespondPlots, peer: ws.WSFlaxConnection): diff --git a/flax/farmer/pooling/__init__.py b/flax/farmer/pooling/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/flax/farmer/pooling/constants.py b/flax/farmer/pooling/constants.py new file mode 100644 index 000000000..2854d78a8 --- /dev/null +++ b/flax/farmer/pooling/constants.py @@ -0,0 +1,5 @@ +from flax.util.ints import uint64 + +constants = { + "POOL_SUB_SLOT_ITERS": uint64(36718720) +} diff --git a/flax/farmer/pooling/og_pool_protocol.py b/flax/farmer/pooling/og_pool_protocol.py new file mode 100644 index 000000000..8cdb5db86 --- /dev/null +++ b/flax/farmer/pooling/og_pool_protocol.py @@ -0,0 +1,24 @@ +from dataclasses import dataclass + +from blspy import G2Element +from flax.types.blockchain_format.proof_of_space import ProofOfSpace +from flax.types.blockchain_format.sized_bytes import bytes32 +from flax.util.ints import uint64 +from flax.util.streamable import streamable, Streamable + + +@dataclass(frozen=True) +@streamable +class PartialPayload(Streamable): + proof_of_space: ProofOfSpace + sp_hash: bytes32 + end_of_sub_slot: bool + payout_address: str # The farmer can choose where to send the rewards. This can take a few minutes + + +@dataclass(frozen=True) +@streamable +class SubmitPartial(Streamable): + payload: PartialPayload + partial_aggregate_signature: G2Element # Sig of partial by plot key and pool key + difficulty: uint64 diff --git a/flax/farmer/pooling/og_pool_state.py b/flax/farmer/pooling/og_pool_state.py new file mode 100644 index 000000000..aa6bbfa12 --- /dev/null +++ b/flax/farmer/pooling/og_pool_state.py @@ -0,0 +1,12 @@ +import time + +from flax.util.ints import uint64 + + +class OgPoolState: + difficulty: uint64 + last_partial_submit_timestamp: float + + def __init__(self, difficulty: uint64 = 1, last_partial_submit_timestamp: float = time.time()): + self.difficulty = difficulty + self.last_partial_submit_timestamp = last_partial_submit_timestamp diff --git a/flax/farmer/pooling/pool_api_client.py b/flax/farmer/pooling/pool_api_client.py new file mode 100644 index 000000000..f0956d4bb --- /dev/null +++ b/flax/farmer/pooling/pool_api_client.py @@ -0,0 +1,29 @@ +from aiohttp import ClientSession, ClientTimeout + +from flax.farmer.pooling.og_pool_protocol import SubmitPartial +from flax.server.server import ssl_context_for_root +from flax.ssl.create_ssl import get_mozilla_ca_crt + +timeout = ClientTimeout(total=30) + + +class PoolApiClient: + base_url: str + + def __init__(self, base_url: str) -> None: + self.base_url = base_url + self.ssl_context = ssl_context_for_root(get_mozilla_ca_crt()) + + async def get_pool_info(self): + async with ClientSession(timeout=timeout) as client: + async with client.get(f"{self.base_url}/pool_info", ssl=self.ssl_context) as res: + return await res.json() + + async def submit_partial(self, submit_partial: SubmitPartial): + async with ClientSession(timeout=timeout) as client: + async with client.post( + f"{self.base_url}/partial", + json=submit_partial.to_json_dict(), + ssl=self.ssl_context + ) as res: + return await res.json() diff --git a/pyproject.toml b/pyproject.toml index a34b6b5e9..ab7a1ba2c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,9 +1,5 @@ [build-system] -requires = ["setuptools>=42", "wheel", "setuptools_scm[toml]>=4.1.2"] -build-backend = "setuptools.build_meta" - -[tool.setuptools_scm] -local_scheme = "no-local-version" +requires = ["setuptools>=42", "wheel", "pip"] [tool.black] line-length = 120 diff --git a/setup.py b/setup.py index 5786243cd..36237b8f9 100644 --- a/setup.py +++ b/setup.py @@ -68,6 +68,7 @@ "flax.full_node", "flax.timelord", "flax.farmer", + "flax.farmer.pooling", "flax.harvester", "flax.introducer", "flax.plotting", @@ -110,7 +111,7 @@ "flax.ssl": ["flax_ca.crt", "flax_ca.key", "dst_root_ca.pem"], "mozilla-ca": ["cacert.pem"], }, - use_scm_version={"fallback_version": "unknown-no-.git-directory"}, + version="0.1.4", long_description=open("README.md").read(), long_description_content_type="text/markdown", zip_safe=False,