# Bittensor Subtensor API Exploration for Stake Tracking

This notebook explores various methods available in the Bittensor Subtensor class to track:
1. Current stake values
2. Stake transactions (add_stake/remove_stake)
3. Epoch boundaries

The goal is to determine the most reliable methods to use in the vesting system's stake tracker.

In [1]:
# Import necessary libraries
import json
from datetime import datetime, timezone, timedelta
import time
import pandas as pd
import matplotlib.pyplot as plt
import bittensor as bt

# Configure display options
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.width', 1000)

## 1. Connect to Bittensor Network

In [2]:
# Connect to Bittensor
def connect_subtensor(network="finney"):
    """Connect to Bittensor and return a Subtensor instance"""
    print(f"Connecting to {network} network...")
    
    # Create a subtensor connection
    subtensor = bt.subtensor(network=network)
    
    # Test connection by getting current block
    current_block = subtensor.get_current_block()
    print(f"Connected successfully. Current block: {current_block}")
    
    return subtensor

subtensor = connect_subtensor()

Connecting to finney network...
Connected successfully. Current block: 5055919


## 2. Utility Functions for Pretty Printing

In [3]:
# Pretty print results
def print_result(result, title=None):
    """Pretty print a result"""
    if title:
        print(f"\n=== {title} ===\n")
    
    if isinstance(result, dict):
        print(json.dumps(result, indent=2, default=str))
    elif hasattr(result, '__dict__'):
        # For objects, try to convert to dict
        try:
            print(json.dumps(result.__dict__, indent=2, default=str))
        except:
            print(result)
    elif isinstance(result, list):
        if len(result) > 5:
            print(f"List with {len(result)} items. Showing first 5:")
            for item in result[:5]:
                if hasattr(item, '__dict__'):
                    try:
                        print(json.dumps(item.__dict__, indent=2, default=str))
                    except:
                        print(item)
                else:
                    print(item)
        else:
            for item in result:
                if hasattr(item, '__dict__'):
                    try:
                        print(json.dumps(item.__dict__, indent=2, default=str))
                    except:
                        print(item)
                else:
                    print(item)
    else:
        print(result)

## 3. Examine Available Methods in Subtensor

In [4]:
# List all methods in Subtensor class related to stake, neurons, and subnets
all_methods = [method for method in dir(subtensor) if not method.startswith('_')]
filtered_methods = [method for method in all_methods 
                   if any(keyword in method.lower() 
                         for keyword in ['stake', 'neuron', 'subnet', 'validator', 'weight'])]

print(f"Found {len(filtered_methods)} relevant methods in Subtensor class:")
for method in sorted(filtered_methods):
    print(f"- {method}")

Found 43 relevant methods in Subtensor class:
- add_stake
- add_stake_multiple
- all_subnets
- commit_weights
- get_all_neuron_certificates
- get_all_subnets_info
- get_current_weight_commit_info
- get_hotkey_stake
- get_minimum_required_stake
- get_neuron_certificate
- get_neuron_for_pubkey_and_subnet
- get_stake
- get_stake_for_coldkey
- get_stake_for_coldkey_and_hotkey
- get_stake_for_hotkey
- get_stake_info_for_coldkey
- get_subnet_burn_cost
- get_subnet_hyperparameters
- get_subnet_reveal_period_epochs
- get_subnets
- get_total_subnets
- get_uid_for_hotkey_on_subnet
- is_hotkey_registered_on_subnet
- max_weight_limit
- min_allowed_weights
- move_stake
- neuron_for_uid
- neurons
- neurons_lite
- register_subnet
- reveal_weights
- root_set_weights
- set_subnet_identity
- set_weights
- subnet
- subnet_exists
- subnetwork_n
- swap_stake
- transfer_stake
- unstake
- unstake_multiple
- weights
- weights_rate_limit


## 4. Get Subnet Information

In [18]:
# Get subnets list
subnets = subtensor.get_subnets()
print(f"Found {len(subnets)} subnets")

# Display basic info for each subnet
for netuid in subnets:
    subnet_info = subtensor.subnet(netuid)
    print(subnet_info)

# Select a subnet for further exploration
# We'll use subnet 1 (typically the main Bittensor network) by default
NETUID = 30  # Change this to explore different subnets



Found 72 subnets
DynamicInfo(netuid=0, owner_hotkey='5C4hrfjw9DjXZTzV3MwzrrAr9P1MJhSrvWGWqi1eSuyUpnhM', owner_coldkey='5C4hrfjw9DjXZTzV3MwzrrAr9P1MJhSrvWGWqi1eSuyUpnhM', subnet_name='root', symbol='Τ', tempo=100, last_step=4919910, blocks_since_last_step=136063, emission=τ0.000000000, alpha_in=τ71,482.498436642, alpha_out=τ5,979,130.859390919, tao_in=τ5,979,133.568963266, price=τ1.000000000, k=427403405995890363737726392772, is_dynamic=False, alpha_out_emission=τ0.000000000, alpha_in_emission=τ0.000000000, tao_in_emission=τ0.000000000, pending_alpha_emission=τ0.000000000, pending_root_emission=τ0.000000000, network_registered_at=0, subnet_volume=τ3,492,357.147528417, subnet_identity=None, moving_price=0.0)
DynamicInfo(netuid=1, owner_hotkey='5HCFWvRqzSHWRPecN7q8J6c7aKQnrCZTMHstPv39xL1wgDHh', owner_coldkey='5HCFWvRqzSHWRPecN7q8J6c7aKQnrCZTMHstPv39xL1wgDHh', subnet_name='apex', symbol='α', tempo=99, last_step=5055897, blocks_since_last_step=76, emission=τ0.000000000, alpha_in=‎93,685.629

## 5. Get Neurons in the Subnet

In [21]:
# Get all neurons in the subnet
neurons = subtensor.neurons(netuid=NETUID)
print(f"Got {len(neurons)} neurons in subnet {NETUID}")


# Select a neuron UID for further exploration
# We'll use UID 0 by default
UID = 0  # Change this to explore different neurons

# Get detailed info for a specific neuron
neuron_info = subtensor.neuron_for_uid(netuid=NETUID, uid=UID)
print_result(neuron_info, f"Neuron {UID} Info")

# Extract hotkey for further queries
HOTKEY = neuron_info.hotkey

Got 256 neurons in subnet 30

=== Neuron 0 Info ===

{
  "hotkey": "5DdAdmAW4MCdSMDJhWPDvZKCbCAAbg8WDxvSMuqvv5nPVEjc",
  "coldkey": "5EZYsL3oA7Yq92NcuMnPPnwYRATQrTUvC9ydGiQU8MGfRfFN",
  "uid": 0,
  "netuid": 30,
  "active": false,
  "stake": "\u03c40.914256813",
  "stake_dict": {
    "5EZYsL3oA7Yq92NcuMnPPnwYRATQrTUvC9ydGiQU8MGfRfFN": "\u03c40.914256813"
  },
  "total_stake": "\u03c40.914256813",
  "rank": 0.00010681315327687495,
  "emission": 0.016468785,
  "incentive": 0.00010681315327687495,
  "consensus": 0.00010681315327687495,
  "trust": 0.9908598458838789,
  "validator_trust": 0.0,
  "dividends": 0.0,
  "last_update": 4212501,
  "validator_permit": false,
  "weights": [
    [
      0,
      29557
    ],
    [
      1,
      29557
    ],
    [
      2,
      48818
    ],
    [
      3,
      29557
    ],
    [
      4,
      13325
    ],
    [
      5,
      29557
    ],
    [
      6,
      29557
    ],
    [
      7,
      29557
    ],
    [
      8,
      29557
    ],
    [
  

## 6. Explore Stake Information Methods

In [None]:
# Get neuron stake by hotkey
neuron_stake = subtensor.get_neuron_stake_for_coldkey_and_hotkey(
    hotkey_ss58=HOTKEY,
    coldkey_ss58=neuron_info.coldkey  # If known, otherwise use neuron_info.coldkey from the previous cell
)
print(f"Stake for neuron {UID} with hotkey {HOTKEY}: {neuron_stake} τ")

# Get total stake for a hotkey
total_stake = subtensor.get_total_stake_for_hotkey(hotkey_ss58=HOTKEY)
print(f"Total stake for hotkey {HOTKEY} across all subnets: {total_stake} τ")

# Get total stake for a subnet
subnet_total_stake = subtensor.get_total_stake_for_subnet(netuid=NETUID)
print(f"Total stake in subnet {NETUID}: {subnet_total_stake} τ")

# For a validator, get the stake in a different way
if subtensor.is_hotkey_validator(hotkey_ss58=HOTKEY, netuid=NETUID):
    validator_stake = subtensor.get_stake_for_validator(netuid=NETUID, hotkey_ss58=HOTKEY)
    print(f"Validator stake for {HOTKEY}: {validator_stake} τ")

## 7. Explore Ownership and Stake Distribution

In [15]:
# Get coldkey owner of the hotkey
coldkey_owner = subtensor.get_hotkey_owner(hotkey_ss58=HOTKEY)
print(f"Coldkey owner of hotkey {HOTKEY}: {coldkey_owner}")

# Get all stakes for this coldkey
if coldkey_owner:
    stakes = subtensor.get_stakes_for_coldkey(coldkey_ss58=coldkey_owner)
    print(f"\nFound {len(stakes)} stake entries for coldkey {coldkey_owner}:")
    
    # Convert to DataFrame for better visualization
    stake_data = []
    for s in stakes:
        stake_data.append({
            'hotkey': s.hotkey, 
            'stake': float(s.stake),
            'stake_in_tao': float(s.stake) / 1e9
        })
    
    if stake_data:
        df = pd.DataFrame(stake_data)
        display(df)
        
        # Create a pie chart of stake distribution
        plt.figure(figsize=(10, 6))
        plt.pie(df['stake_in_tao'], labels=df['hotkey'], autopct='%1.1f%%')
        plt.title(f"Stake Distribution for Coldkey {coldkey_owner}")
        plt.show()

NameError: name 'HOTKEY' is not defined

## 8. Detect Epoch Boundaries

In [27]:
# Detect epoch boundaries using subnet tempo and current block
def check_epoch_boundary(subtensor, netuid):
    """Check if we're at an epoch boundary by examining subnet state"""
    try:
        # Get subnet info to get the tempo
        subnet_info = subtensor.subnet(netuid)
        tempo = subnet_info.tempo
        
        # Get current block
        current_block = subtensor.get_current_block()
        
        # Calculate blocks until next epoch
        blocks_since_epoch_start = current_block % tempo
        blocks_until_next_epoch = tempo - blocks_since_epoch_start
        
        return {
            'current_block': current_block,
            'tempo': tempo,
            'blocks_since_epoch_start': blocks_since_epoch_start,
            'blocks_until_next_epoch': blocks_until_next_epoch,
            'is_epoch_boundary': blocks_since_epoch_start == 0
        }
            
    except Exception as e:
        print(f"Error checking epoch boundary: {e}")
        return None

epoch_check_result = check_epoch_boundary(subtensor, NETUID)
print_result(epoch_check_result, "Epoch Boundary Check")


=== Epoch Boundary Check ===

{
  "current_block": 5056332,
  "tempo": 360,
  "blocks_since_epoch_start": 132,
  "blocks_until_next_epoch": 228,
  "is_epoch_boundary": false
}


## 9. Monitor Stake Changes Over Time

In [33]:
# Function to monitor stake changes over time
def monitor_stake_changes(subtensor, netuid,coldkey, interval_seconds=5, duration_minutes=1):
    """Monitor stake changes for a specific hotkey over time"""
    start_time = datetime.now(timezone.utc)
    end_time = start_time + timedelta(minutes=duration_minutes)
    
    stake_history = []
    
    print(f"Starting stake monitoring for hotkey {coldkey} in subnet {netuid}")
    print(f"Will run for {duration_minutes} minutes with {interval_seconds} second intervals")
    
    current_time = datetime.now(timezone.utc)
    while current_time < end_time:
        # Get current stake
        try:
            # Get current block
            current_block = subtensor.get_current_block()
            
            # Get stake
            stake_info= subtensor.get_stake_info_for_coldkey(coldkey)
            
            # Record timestamp and stake value
            stake_history.append({
                'timestamp': current_time,
                'block': current_block,
                'stake info': stake_info,
                
            })
            
            print(f"{current_time.strftime('%H:%M:%S')} - Block: {current_block}, Stake: {stake_info}")
            
            # Sleep until next interval
            time.sleep(interval_seconds)
            
        except Exception as e:
            print(f"Error monitoring stake: {e}")
            time.sleep(interval_seconds)
        
        current_time = datetime.now(timezone.utc)
    
    # Analyze and visualize results
    if stake_history:
        df = pd.DataFrame(stake_history)
        plt.figure(figsize=(10, 6))
        plt.plot(df['timestamp'], df['stake_in_tao'])
        plt.title(f"Stake Changes for {coldkey}")
        plt.xlabel("Time")
        plt.ylabel("Stake (τ)")
        plt.grid(True)
        plt.show()
        
        return df
    else:
        print("No stake history collected")
        return None

# Uncomment the following line to run actual monitoring
stake_df = monitor_stake_changes(subtensor, NETUID, HOTKEY)

Starting stake monitoring for hotkey 5DdAdmAW4MCdSMDJhWPDvZKCbCAAbg8WDxvSMuqvv5nPVEjc in subnet 30
Will run for 1 minutes with 5 second intervals
19:04:20 - Block: 5056353, Stake: []
19:04:26 - Block: 5056354, Stake: []


KeyboardInterrupt: 

## 10. Notes and Observations

Document your findings here as you experiment with the Subtensor methods:

1. Most effective method for getting current stake: `get_total_stake_for_hotkey` or `get_neuron_stake_for_coldkey_and_hotkey`
2. Detecting epoch boundaries: Can use subnet `tempo` and `current_block` to determine epoch boundaries
3. Challenges and limitations: Need to poll regularly to detect stake changes as there's no event subscription mechanism
4. Recommended implementation approach: Periodically query stake values and compare with previous values to detect changes

## 11. Summary and Implementation Plan

Based on the experiments in this notebook, here's an implementation plan for the stake tracking system:

### Implementation Plan

```python
class SubtensorStakeTracker:
    def __init__(self, subtensor, netuid, polling_interval=60):
        self.subtensor = subtensor
        self.netuid = netuid
        self.polling_interval = polling_interval
        self.last_check_block = None
        self.tracked_hotkeys = {}
        self.tracked_stakes = {}
        
    def add_tracking(self, hotkey):
        """Add a hotkey to track"""
        if hotkey not in self.tracked_hotkeys:
            self.tracked_hotkeys[hotkey] = {
                'tracked_since': datetime.now(timezone.utc),
                'last_stake': self.subtensor.get_total_stake_for_hotkey(hotkey_ss58=hotkey)
            }
    
    def check_stakes(self):
        """Check stake changes for tracked hotkeys"""
        current_block = self.subtensor.get_current_block()
        changes = []
        
        # Check each tracked hotkey
        for hotkey, data in self.tracked_hotkeys.items():
            current_stake = self.subtensor.get_total_stake_for_hotkey(hotkey_ss58=hotkey)
            
            # If stake changed, record it
            if current_stake != data['last_stake']:
                change = {
                    'hotkey': hotkey,
                    'previous_stake': data['last_stake'],
                    'current_stake': current_stake,
                    'change': current_stake - data['last_stake'],
                    'block': current_block,
                    'timestamp': datetime.now(timezone.utc)
                }
                changes.append(change)
                
                # Update last stake
                self.tracked_hotkeys[hotkey]['last_stake'] = current_stake
        
        self.last_check_block = current_block
        return changes
    
    def is_epoch_boundary(self):
        """Check if we're at an epoch boundary"""
        subnet_info = self.subtensor.get_subnet_info(self.netuid)
        tempo = subnet_info.tempo
        current_block = self.subtensor.get_current_block()
        
        return current_block % tempo == 0
```