Skip to content
This repository has been archived by the owner on Jul 22, 2022. It is now read-only.

Commit

Permalink
Add OG pooling
Browse files Browse the repository at this point in the history
  • Loading branch information
felixbrucker committed Oct 29, 2021
1 parent 03079a5 commit 5b49033
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 12 deletions.
3 changes: 3 additions & 0 deletions build_scripts/installer-version.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
def main():
windows = len(sys.argv) > 1 and "win" in sys.argv[1] # Special case windows to 0.1.6225

print("0.1.4")
return

scm_full_version = get_version(root="..", relative_to=__file__)
# scm_full_version = "1.0.5.dev22"
os.environ["SCM_VERSION"] = scm_full_version
Expand Down
102 changes: 101 additions & 1 deletion flax/farmer/farmer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import time
from asyncio import sleep
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
import traceback
Expand All @@ -12,12 +13,16 @@
import flax.server.ws_connection as ws # lgtm [py/import-and-import-from]
from flax.consensus.coinbase import create_puzzlehash_for_pk
from flax.consensus.constants import ConsensusConstants
from flax.consensus.pot_iterations import calculate_sp_interval_iters
from flax.daemon.keychain_proxy import (
KeychainProxy,
KeychainProxyConnectionFailure,
connect_to_keychain_and_validate,
wrap_local_keychain,
)
from flax.farmer.pooling.constants import constants
from flax.farmer.pooling.og_pool_state import OgPoolState
from flax.farmer.pooling.pool_api_client import PoolApiClient
from flax.pools.pool_config import PoolWalletConfig, load_pool_config
from flax.protocols import farmer_protocol, harvester_protocol
from flax.protocols.pool_protocol import (
Expand All @@ -38,7 +43,7 @@
from flax.ssl.create_ssl import get_mozilla_ca_crt
from flax.types.blockchain_format.proof_of_space import ProofOfSpace
from flax.types.blockchain_format.sized_bytes import bytes32
from flax.util.bech32m import decode_puzzle_hash
from flax.util.bech32m import decode_puzzle_hash, encode_puzzle_hash
from flax.util.byte_types import hexstr_to_bytes
from flax.util.config import load_config, save_config, config_path_for_filename
from flax.util.hash import std_hash
Expand Down Expand Up @@ -182,17 +187,69 @@ async def setup_keys(self):

self.harvester_cache: Dict[str, Dict[str, HarvesterCacheEntry]] = {}

# OG Pooling setup
self.pool_url = self.config.get("pool_url")
self.pool_payout_address = self.config.get("pool_payout_address")
self.pool_sub_slot_iters = constants.get("POOL_SUB_SLOT_ITERS")
self.iters_limit = calculate_sp_interval_iters(self.constants, self.pool_sub_slot_iters)
self.pool_minimum_difficulty: uint64 = uint64(1)
self.og_pool_state: OgPoolState = OgPoolState(difficulty=self.pool_minimum_difficulty)
self.pool_var_diff_target_in_seconds = 5 * 60
self.pool_reward_target = self.pool_target
self.adjust_pool_difficulties_task: Optional[asyncio.Task] = None
self.check_pool_reward_target_task: Optional[asyncio.Task] = None

def is_pooling_enabled(self):
return self.pool_url is not None and self.pool_payout_address is not None

async def _start(self):
await self.setup_keys()
self.update_pool_state_task = asyncio.create_task(self._periodically_update_pool_state_task())
self.cache_clear_task = asyncio.create_task(self._periodically_clear_cache_and_refresh_task())
if not self.is_pooling_enabled():
self.log.info(f"Not OG pooling as 'pool_payout_address' and/or 'pool_url' are missing in your config")
return
self.pool_api_client = PoolApiClient(self.pool_url)
await self.initialize_pooling()
self.adjust_pool_difficulty_task = asyncio.create_task(self._periodically_adjust_pool_difficulty_task())
self.check_pool_reward_target_task = asyncio.create_task(self._periodically_check_pool_reward_target_task())

async def initialize_pooling(self):
pool_info: Dict = {}
has_pool_info = False
while not has_pool_info:
try:
pool_info = await self.pool_api_client.get_pool_info()
has_pool_info = True
except Exception as e:
self.log.error(f"Error retrieving OG pool info: {e}")
await sleep(5)

pool_name = pool_info["name"]
self.log.info(f"Connected to OG pool {pool_name}")
self.pool_var_diff_target_in_seconds = pool_info["var_diff_target_in_seconds"]

self.pool_minimum_difficulty = uint64(pool_info["minimum_difficulty"])
self.og_pool_state.difficulty = self.pool_minimum_difficulty

pool_target = bytes.fromhex(pool_info["target_puzzle_hash"][2:])
assert len(pool_target) == 32
self.pool_reward_target = pool_target
address_prefix = self.config["network_overrides"]["config"][self.config["selected_network"]]["address_prefix"]
pool_target_encoded = encode_puzzle_hash(pool_target, address_prefix)
if self.pool_target is not pool_target or self.pool_target_encoded is not pool_target_encoded:
self.set_reward_targets(farmer_target_encoded=None, pool_target_encoded=pool_target_encoded)

def _close(self):
self._shut_down = True

async def _await_closed(self):
await self.cache_clear_task
await self.update_pool_state_task
if self.adjust_pool_difficulty_task is not None:
await self.adjust_pool_difficulty_task
if self.check_pool_reward_target_task is not None:
await self.check_pool_reward_target_task

def _set_state_changed_callback(self, callback: Callable):
self.state_changed_callback = callback
Expand Down Expand Up @@ -717,3 +774,46 @@ async def _periodically_clear_cache_and_refresh_task(self):
log.error(f"_periodically_clear_cache_and_refresh_task failed: {traceback.format_exc()}")

await asyncio.sleep(1)

async def _periodically_adjust_pool_difficulty_task(self):
time_slept = 0
while not self._shut_down:
# Sleep in 1 sec intervals to quickly exit outer loop, but effectively sleep 60 sec between actual code runs
await sleep(1)
time_slept += 1
if time_slept < 60:
continue
time_slept = 0
if (time.time() - self.og_pool_state.last_partial_submit_timestamp) < self.pool_var_diff_target_in_seconds:
continue
diff_since_last_partial_submit_in_seconds = time.time() - self.og_pool_state.last_partial_submit_timestamp
missing_partial_submits = int(
diff_since_last_partial_submit_in_seconds // self.pool_var_diff_target_in_seconds)
new_difficulty = uint64(max(
(self.og_pool_state.difficulty - (missing_partial_submits * 2)),
self.pool_minimum_difficulty
))
if new_difficulty == self.og_pool_state.difficulty:
continue
old_difficulty = self.og_pool_state.difficulty
self.og_pool_state.difficulty = new_difficulty
log.info(
f"Lowered the OG pool difficulty from {old_difficulty} to "
f"{new_difficulty} due to no partial submits within the last "
f"{int(round(diff_since_last_partial_submit_in_seconds))} seconds"
)

async def _periodically_check_pool_reward_target_task(self):
time_slept = 0
while not self._shut_down:
# Sleep in 1 sec intervals to quickly exit outer loop, but effectively sleep 5 min between actual code runs
await sleep(1)
time_slept += 1
if time_slept < 5 * 60:
continue
time_slept = 0
if self.pool_target is self.pool_reward_target:
continue
address_prefix = self.config["network_overrides"]["config"][self.config["selected_network"]]["address_prefix"]
pool_target_encoded = encode_puzzle_hash(self.pool_reward_target, address_prefix)
self.set_reward_targets(farmer_target_encoded=None, pool_target_encoded=pool_target_encoded)
113 changes: 108 additions & 5 deletions flax/farmer/farmer_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
from typing import Callable, Optional, List, Any, Dict, Tuple

import aiohttp
from blspy import AugSchemeMPL, G2Element, PrivateKey
from blspy import AugSchemeMPL, G2Element, G1Element, PrivateKey

import flax.server.ws_connection as ws
from flax.consensus.network_type import NetworkType
from flax.consensus.pot_iterations import calculate_iterations_quality, calculate_sp_interval_iters
from flax.farmer.farmer import Farmer
from flax.farmer.pooling.og_pool_protocol import PartialPayload, SubmitPartial
from flax.protocols import farmer_protocol, harvester_protocol
from flax.protocols.harvester_protocol import PoolDifficulty
from flax.protocols.pool_protocol import (
Expand All @@ -23,6 +24,7 @@
from flax.ssl.create_ssl import get_mozilla_ca_crt
from flax.types.blockchain_format.pool_target import PoolTarget
from flax.types.blockchain_format.proof_of_space import ProofOfSpace
from flax.types.blockchain_format.sized_bytes import bytes32
from flax.util.api_decorators import api_request, peer_required
from flax.util.ints import uint32, uint64

Expand Down Expand Up @@ -259,9 +261,17 @@ async def new_proof_of_space(
self.farmer.log.error(f"Error sending partial to {pool_url}, {resp.status}")
except Exception as e:
self.farmer.log.error(f"Error connecting to pool: {e}")
return

return
pool_public_key = new_proof_of_space.proof.pool_public_key
if pool_public_key is not None and self.farmer.is_pooling_enabled():
await self.process_new_proof_of_space_for_pool(
new_proof_of_space,
peer,
pool_public_key,
computed_quality_string
)

return

@api_request
async def respond_signatures(self, response: harvester_protocol.RespondSignatures):
Expand Down Expand Up @@ -441,10 +451,15 @@ async def new_signage_point(self, new_signage_point: farmer_protocol.NewSignageP
p2_singleton_puzzle_hash,
)
)
difficulty = new_signage_point.difficulty
sub_slot_iters = new_signage_point.sub_slot_iters
if self.farmer.is_pooling_enabled():
sub_slot_iters = self.farmer.pool_sub_slot_iters
difficulty = self.farmer.og_pool_state.difficulty
message = harvester_protocol.NewSignagePointHarvester(
new_signage_point.challenge_hash,
new_signage_point.difficulty,
new_signage_point.sub_slot_iters,
difficulty,
sub_slot_iters,
new_signage_point.signage_point_index,
new_signage_point.challenge_chain_sp,
pool_difficulties,
Expand Down Expand Up @@ -509,6 +524,94 @@ async def farming_info(self, request: farmer_protocol.FarmingInfo):
},
)

async def process_new_proof_of_space_for_pool(
self,
new_proof_of_space: harvester_protocol.NewProofOfSpace,
peer: ws.WSFlaxConnection,
pool_public_key: G1Element,
computed_quality_string: bytes32
):
og_pool_state = self.farmer.og_pool_state

# Otherwise, send the proof of space to the pool
# When we win a block, we also send the partial to the pool
required_iters = calculate_iterations_quality(
self.farmer.constants.DIFFICULTY_CONSTANT_FACTOR,
computed_quality_string,
new_proof_of_space.proof.size,
og_pool_state.difficulty,
new_proof_of_space.sp_hash,
)
if required_iters >= self.farmer.iters_limit:
self.farmer.log.info(
f"Proof of space not good enough for OG pool difficulty of {og_pool_state.difficulty}"
)
return

# Submit partial to pool
is_eos = new_proof_of_space.signage_point_index == 0
payload = PartialPayload(
new_proof_of_space.proof,
new_proof_of_space.sp_hash,
is_eos,
self.farmer.pool_payout_address
)

# The plot key is 2/2 so we need the harvester's half of the signature
m_to_sign = payload.get_hash()
request = harvester_protocol.RequestSignatures(
new_proof_of_space.plot_identifier,
new_proof_of_space.challenge_hash,
new_proof_of_space.sp_hash,
[m_to_sign],
)
response: Any = await peer.request_signatures(request)
if not isinstance(response, harvester_protocol.RespondSignatures):
self.farmer.log.error(f"Invalid response from harvester: {response}")
return

assert len(response.message_signatures) == 1

plot_signature: Optional[G2Element] = None
for sk in self.farmer.get_private_keys():
pk = sk.get_g1()
if pk == response.farmer_pk:
agg_pk = ProofOfSpace.generate_plot_public_key(response.local_pk, pk)
assert agg_pk == new_proof_of_space.proof.plot_public_key
sig_farmer = AugSchemeMPL.sign(sk, m_to_sign, agg_pk)
plot_signature = AugSchemeMPL.aggregate([sig_farmer, response.message_signatures[0][1]])
assert AugSchemeMPL.verify(agg_pk, m_to_sign, plot_signature)
pool_sk = self.farmer.pool_sks_map[bytes(pool_public_key)]
authentication_signature = AugSchemeMPL.sign(pool_sk, m_to_sign)

assert plot_signature is not None
agg_sig: G2Element = AugSchemeMPL.aggregate([plot_signature, authentication_signature])

submit_partial = SubmitPartial(payload, agg_sig, og_pool_state.difficulty)
self.farmer.log.debug("Submitting partial to OG pool ..")
og_pool_state.last_partial_submit_timestamp = time.time()
submit_partial_response: Dict
try:
submit_partial_response = await self.farmer.pool_api_client.submit_partial(submit_partial)
except Exception as e:
self.farmer.log.error(f"Error submitting partial to OG pool: {e}")
return
self.farmer.log.debug(f"OG pool response: {submit_partial_response}")
if "error_code" in submit_partial_response:
if submit_partial_response["error_code"] == 5:
self.farmer.log.info(
"Local OG pool difficulty too low, adjusting to OG pool difficulty "
f"({submit_partial_response['current_difficulty']})"
)
og_pool_state.difficulty = uint64(submit_partial_response["current_difficulty"])
else:
self.farmer.log.error(
f"Error in OG pooling: {submit_partial_response['error_code'], submit_partial_response['error_message']}"
)
else:
self.farmer.log.info("The partial submitted to the OG pool was accepted")
og_pool_state.difficulty = uint64(submit_partial_response["current_difficulty"])

@api_request
@peer_required
async def respond_plots(self, _: harvester_protocol.RespondPlots, peer: ws.WSFlaxConnection):
Expand Down
Empty file added flax/farmer/pooling/__init__.py
Empty file.
5 changes: 5 additions & 0 deletions flax/farmer/pooling/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from flax.util.ints import uint64

constants = {
"POOL_SUB_SLOT_ITERS": uint64(36718720)
}
24 changes: 24 additions & 0 deletions flax/farmer/pooling/og_pool_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from dataclasses import dataclass

from blspy import G2Element
from flax.types.blockchain_format.proof_of_space import ProofOfSpace
from flax.types.blockchain_format.sized_bytes import bytes32
from flax.util.ints import uint64
from flax.util.streamable import streamable, Streamable


@dataclass(frozen=True)
@streamable
class PartialPayload(Streamable):
proof_of_space: ProofOfSpace
sp_hash: bytes32
end_of_sub_slot: bool
payout_address: str # The farmer can choose where to send the rewards. This can take a few minutes


@dataclass(frozen=True)
@streamable
class SubmitPartial(Streamable):
payload: PartialPayload
partial_aggregate_signature: G2Element # Sig of partial by plot key and pool key
difficulty: uint64
12 changes: 12 additions & 0 deletions flax/farmer/pooling/og_pool_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import time

from flax.util.ints import uint64


class OgPoolState:
difficulty: uint64
last_partial_submit_timestamp: float

def __init__(self, difficulty: uint64 = 1, last_partial_submit_timestamp: float = time.time()):
self.difficulty = difficulty
self.last_partial_submit_timestamp = last_partial_submit_timestamp
29 changes: 29 additions & 0 deletions flax/farmer/pooling/pool_api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from aiohttp import ClientSession, ClientTimeout

from flax.farmer.pooling.og_pool_protocol import SubmitPartial
from flax.server.server import ssl_context_for_root
from flax.ssl.create_ssl import get_mozilla_ca_crt

timeout = ClientTimeout(total=30)


class PoolApiClient:
base_url: str

def __init__(self, base_url: str) -> None:
self.base_url = base_url
self.ssl_context = ssl_context_for_root(get_mozilla_ca_crt())

async def get_pool_info(self):
async with ClientSession(timeout=timeout) as client:
async with client.get(f"{self.base_url}/pool_info", ssl=self.ssl_context) as res:
return await res.json()

async def submit_partial(self, submit_partial: SubmitPartial):
async with ClientSession(timeout=timeout) as client:
async with client.post(
f"{self.base_url}/partial",
json=submit_partial.to_json_dict(),
ssl=self.ssl_context
) as res:
return await res.json()
Loading

0 comments on commit 5b49033

Please sign in to comment.