In [5]:
from datetime import datetime

from pydantic import BaseModel, Field

class SimulationInput(BaseModel):
    asset: str = Field(default="BTC", description="The asset to simulate.")
    start_time: str = Field(
        default=datetime.now().isoformat(),
        description="The start time of the simulation.",
    )
    time_increment: int = Field(
        default=300, description="Time increment in seconds."
    )
    time_length: int = Field(
        default=86400, description="Total time length in seconds."
    )
    num_simulations: int = Field(
        default=1, description="Number of simulation runs."
    )

    class Config:
        arbitrary_types_allowed = True

In [42]:
simulation_input = SimulationInput(
        asset="BTC",
        time_increment=300,
        time_length=86400,
        num_simulations=3,
    )

In [7]:
from datetime import timedelta, timezone
def get_current_time() -> datetime:
    # Get current date and time
    return datetime.now(timezone.utc).replace(microsecond=0)

In [8]:
current_time = get_current_time()

In [9]:
def round_time_to_minutes(
    dt: datetime, in_seconds: int, extra_seconds=0
) -> datetime:
    """round validation time to the closest minute and add extra minutes

    Args:
        dt (datetime): request_time
        in_seconds (int): 60
        extra_seconds (int, optional): self.timeout_extra_seconds: 120. Defaults to 0.

    Returns:
        datetime: rounded-up datetime
    """
    # Define the rounding interval
    rounding_interval = timedelta(seconds=in_seconds)

    # Calculate the number of seconds since the start of the day
    seconds = (
        dt - dt.replace(hour=0, minute=0, second=0, microsecond=0)
    ).total_seconds()

    # Calculate the next multiple of time_increment in seconds
    next_interval_seconds = (
        (seconds // rounding_interval.total_seconds()) + 1
    ) * rounding_interval.total_seconds()

    # Get the rounded-up datetime
    rounded_time = (
        dt.replace(hour=0, minute=0, second=0, microsecond=0)
        + timedelta(seconds=next_interval_seconds)
        + timedelta(seconds=extra_seconds)
    )

    return rounded_time

In [10]:
start_time = round_time_to_minutes(current_time, 60, 120)
simulation_input.start_time = start_time.isoformat()
print("start_time", simulation_input.start_time)

start_time 2025-09-15T09:06:00+00:00


In [11]:
import requests
TOKEN_MAP = {
    "BTC": "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43",
    "ETH": "ff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace",
    "XAU": "765d2ba906dbc32ca17cc11f5310a89e9ee1f6420508c63861f2f8ba4ee34bb2",
    "SOL": "ef0d8b6fda2ceba41da15d4095d1da392a0d2f8ed0c6c7bc0f4cfac8c280b56d",
}

pyth_base_url = "https://hermes.pyth.network/v2/updates/price/latest"


def get_asset_price(asset="BTC"):
    pyth_params = {"ids[]": [TOKEN_MAP[asset]]}
    response = requests.get(pyth_base_url, params=pyth_params)
    if response.status_code != 200:
        print("Error in response of Pyth API")
        return

    data = response.json()
    parsed_data = data.get("parsed", [])

    asset = parsed_data[0]
    price = int(asset["price"]["price"])
    expo = int(asset["price"]["expo"])

    live_price = price * (10**expo)

    return live_price


In [82]:
from keras.models import Sequential
from keras.layers import Dense, GRU, Dropout
from keras.optimizers import Adam
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler
# Load model
    
model = Sequential()
model.add(GRU(units=288, return_sequences=True, input_shape=(289,1)))
model.add(Dropout(0.2))
# Second GRU layer with dropout
model.add(GRU(units=144, return_sequences=True))
model.add(Dropout(0.2))
# Third GRU layer with dropout
model.add(GRU(units=144, return_sequences=False))
model.add(Dropout(0.2))

# Output layer
model.add(Dense(units=289))

model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error', metrics=['mae'])

In [91]:
import numpy as np 
a=0
def simulate_single_price_path(
    current_price, time_increment, time_length, sigma
):
    """
    Simulate a single crypto asset price path.
    """
    one_hour = 3600
    dt = time_increment / one_hour
    num_steps = int(time_length / time_increment)
    std_dev = sigma * np.sqrt(dt)
    price_change_pcts = np.random.normal(0, std_dev, size=num_steps)
    cumulative_returns = np.cumprod(1 + price_change_pcts)
    cumulative_returns = np.insert(cumulative_returns, 0, 1.0)
    price_path = current_price * cumulative_returns
    
    
    
    data = pd.read_csv('./Data/btcusd_5-min_data.csv')
    x = pd.DataFrame(data['Close'])[:289]
    X_scaler = StandardScaler()
    x = X_scaler.fit_transform(x)
    x = np.array(x)
    x = np.array([np.reshape(x, [289])])
    price_path =  model.predict(x)
    price_path = X_scaler.inverse_transform(price_path)
    print(price_path.shape)
    return np.reshape(price_path, [289])

def simulate_crypto_price_paths(
    current_price, time_increment, time_length, num_simulations, sigma
):
    """
    Simulate multiple crypto asset price paths.
    """

    price_paths = []
    for _ in range(num_simulations):
        print(_)
        price_path = simulate_single_price_path(
            current_price, time_increment, time_length, sigma
        )
        price_paths.append(price_path)

    return np.array(price_paths)

def convert_prices_to_time_format(prices, start_time, time_increment):
    """
    Convert an array of float numbers (prices) into an array of dictionaries with 'time' and 'price'.

    :param prices: List of float numbers representing prices.
    :param start_time: ISO 8601 string representing the start time.
    :param time_increment: Time increment in seconds between consecutive prices.
    :return: List of dictionaries with 'time' and 'price' keys.
    """
    start_time = datetime.fromisoformat(
        start_time
    )  # Convert start_time to a datetime object
    result = []

    for price_item in prices:
        single_prediction = []
        for i, price in enumerate(price_item):
            time_point = start_time + timedelta(seconds=i * time_increment)
            single_prediction.append(
                {"time": time_point.isoformat(), "price": price}
            )
        result.append(single_prediction)

    return result


In [51]:
def generate_simulations(
    asset="BTC",
    start_time: str = "",
    time_increment=300,
    time_length=86400,
    num_simulations=1,
    sigma=0.01,
):
    """
    Generate simulated price paths.

    Parameters:
        asset (str): The asset to simulate. Default is 'BTC'.
        start_time (str): The start time of the simulation. Defaults to current time.
        time_increment (int): Time increment in seconds.
        time_length (int): Total time length in seconds.
        num_simulations (int): Number of simulation runs.
        sigma (float): Standard deviation of the simulated price path.

    Returns:
        numpy.ndarray: Simulated price paths.
    """
    if start_time == "":
        raise ValueError("Start time must be provided.")

    current_price = get_asset_price(asset)
    print(current_price)
    if current_price is None:
        raise ValueError(f"Failed to fetch current price for asset: {asset}")

    if asset == "BTC":
        sigma *= 3
    elif asset == "ETH":
        sigma *= 1.25
    elif asset == "XAU":
        sigma *= 0.5
    elif asset == "SOL":
        sigma *= 0.75

    simulations = simulate_crypto_price_paths(
        current_price=current_price,
        time_increment=time_increment,
        time_length=time_length,
        num_simulations=num_simulations,
        sigma=sigma,
    )

    predictions = convert_prices_to_time_format(
        simulations.tolist(), start_time, time_increment
    )
    return predictions

In [59]:
import typing
def datetime_valid(dt_str) -> bool:
    try:
        datetime.fromisoformat(dt_str)
    except ValueError:
        return False
    return True
def validate_datetime(
    dt_str,
) -> typing.Tuple[datetime, typing.Optional[str]]:
    if not isinstance(dt_str, str):
        return (
            datetime.now(),
            f"Time format is incorrect: expected str, got {type(dt_str)}",
        )
    if not datetime_valid(dt_str):
        return (
            datetime.now(),
            f"Time format is incorrect: expected isoformat, got {dt_str}",
        )

    return datetime.fromisoformat(dt_str), None

def validate_responses(
    response,
    simulation_input: SimulationInput,
    request_time: datetime,
    process_time_str: typing.Optional[str],
) -> str:
    """
    Validate responses from miners.

    Return a string with the error message
    if the response is not following the expected format or the response is empty,
    otherwise, return "CORRECT".
    """
    # check the process time
    if process_time_str is None:
        return "time out or internal server error (process time is None)"

    received_at = request_time + timedelta(seconds=float(process_time_str))
    start_time = datetime.fromisoformat(simulation_input.start_time)
    if received_at > start_time:
        return f"Response received after the simulation start time: expected {start_time}, got {received_at}"

    # check if the response is empty
    if response is None or len(response) == 0:
        return "Response is empty"

    # check the number of paths
    if len(response) != simulation_input.num_simulations:
        return f"Number of paths is incorrect: expected {simulation_input.num_simulations}, got {len(response)}"

    for path in response:
        # check the number of time points
        expected_time_points = (
            simulation_input.time_length // simulation_input.time_increment + 1
        )
        if len(path) != expected_time_points:
            return f"Number of time points is incorrect: expected {expected_time_points}, got {len(path)}"

        # check the start time
        first_time = path[0].get("time", "")
        if first_time != simulation_input.start_time:
            return f"Start time is incorrect: expected {simulation_input.start_time}, got {first_time}"

        for i in range(1, len(path)):
            # check the time formats
            i_minus_one_str_time = path[i - 1].get("time", "")
            i_minus_one_datetime, error_message = validate_datetime(
                i_minus_one_str_time
            )
            if error_message:
                return error_message

            i_str_time = path[i].get("time", "")
            i_datetime, error_message = validate_datetime(i_str_time)
            if error_message:
                return error_message

            # check the time increment
            expected_delta = timedelta(seconds=simulation_input.time_increment)
            actual_delta = i_datetime - i_minus_one_datetime
            if actual_delta != expected_delta:
                return f"Time increment is incorrect: expected {expected_delta}, got {actual_delta}"

            # check the price format
            price = path[i].get("price")
            if not isinstance(price, (int, float)):
                return f"Price format is incorrect: expected int or float, got {type(price)}"

    return "CORRECT"


In [92]:
prediction = generate_simulations(
        simulation_input.asset,
        start_time=simulation_input.start_time,
        time_increment=simulation_input.time_increment,
        time_length=simulation_input.time_length,
        num_simulations=simulation_input.num_simulations,
    )
format_validation = validate_responses(
        prediction,
        simulation_input,
        datetime.fromisoformat(simulation_input.start_time),
        "0",
    )
format_validation

114881.90000000001
0
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 596ms/step
(1, 289)
1
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 479ms/step
(1, 289)
2
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 291ms/step
(1, 289)


'CORRECT'

In [17]:
import requests
import pandas as pd
import time

def get_historical_btc_data(days=365):
    """
    Fetches historical BTC price data for a specified number of days.
    
    Parameters:
    days (int): Number of days of historical data to fetch (max=365 for free tier)
    
    Returns:
    pandas.DataFrame: DataFrame with timestamp, price, and other metrics.
    """
    url = f"https://api.coingecko.com/api/v3/coins/bitcoin/market_chart"
    params = {
        'vs_currency': 'usd',
        'days': days,
        'interval': 'daily'  # 'daily' or 'hourly' for more granular data
    }
    
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    }
    
    try:
        response = requests.get(url, params=params, headers=headers, timeout=30)
        response.raise_for_status()
        data = response.json()
        
        # Extract prices and convert to DataFrame
        prices = data['prices']
        df = pd.DataFrame(prices, columns=['timestamp', 'price'])
        
        # Convert timestamp to readable date
        df['date'] = pd.to_datetime(df['timestamp'], unit='ms')
        
        # Add other metrics if available
        if 'market_caps' in data:
            market_caps = data['market_caps']
            df['market_cap'] = [cap[1] for cap in market_caps]
        
        if 'total_volumes' in data:
            volumes = data['total_volumes']
            df['volume'] = [vol[1] for vol in volumes]
        
        # Reorder columns
        df = df[['timestamp', 'date', 'price', 'market_cap', 'volume']]
        
        return df
        
    except requests.exceptions.RequestException as e:
        print(f"Request failed: {e}")
        return None

# Get 30 days of BTC data
btc_data = get_historical_btc_data(days=1000)

if btc_data is not None:
    # Save to CSV
    btc_data.to_csv('btc_historical_data.csv', index=False)
    print("BTC historical data saved to 'btc_historical_data.csv'")
    print(f"Data shape: {btc_data.shape}")
    print(btc_data.head())
else:
    print("Failed to fetch BTC data")

Request failed: 401 Client Error: Unauthorized for url: https://api.coingecko.com/api/v3/coins/bitcoin/market_chart?vs_currency=usd&days=1000&interval=daily
Failed to fetch BTC data


In [13]:
import requests
import pandas as pd
from bs4 import BeautifulSoup

# Scrape subnet data from TAOStats
url = "https://taostats.io/subnets/"
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}

try:
    response = requests.get(url, headers=headers, timeout=10)
    response.raise_for_status()  # Raise an exception for HTTP errors
    
    # Parse the HTML content
    soup = BeautifulSoup(response.content, 'html.parser')
    
    # Find all tables on the page
    tables = soup.find_all('table')
    
    if not tables:
        print("No tables found on the page")
    else:
        # Try to extract data from each table until we find the right one
        for i, table in enumerate(tables):
            try:
                # Convert table to DataFrame
                df = pd.read_html(str(table))[0]
                
                # Check if this looks like the subnet table (has expected columns)
                if 'UID' in df.columns and 'Name' in df.columns:
                    print(f"Found subnet table at position {i}")
                    df.to_csv('taostats_subnets.csv', index=False)
                    print("Data successfully saved to taostats_subnets.csv")
                    break
            except Exception as e:
                print(f"Error processing table {i}: {e}")
                continue
        else:
            print("Could not find the subnet table with expected columns")
            
except requests.exceptions.RequestException as e:
    print(f"Request failed: {e}")
except Exception as e:
    print(f"An error occurred: {e}")

Request failed: 500 Server Error: Internal Server Error for url: https://taostats.io/subnets


In [14]:
import bittensor as bt
import pandas as pd

# Connect to Bittensor network
subtensor = bt.subtensor(network='finney')  # Use 'test' for testnet

# Get all subnets information
subnets = subtensor.get_all_subnets_info()
subnets


[SubnetInfo(netuid=0, rho=10, kappa=32767, difficulty=4611686018427387903, immunity_period=4096, max_allowed_validators=64, min_allowed_weights=0, max_weight_limit=65535, scaling_law_power=50, subnetwork_n=64, max_n=64, blocks_since_epoch=440, tempo=100, modality=0, connection_requirements={}, emission_value=0, burn=τ0.000500000, owner_ss58='5C4hrfjw9DjXZTzV3MwzrrAr9P1MJhSrvWGWqi1eSuyUpnhM'),
 SubnetInfo(netuid=1, rho=10, kappa=32767, difficulty=10000000, immunity_period=7200, max_allowed_validators=128, min_allowed_weights=8, max_weight_limit=65535, scaling_law_power=50, subnetwork_n=1024, max_n=1024, blocks_since_epoch=23, tempo=99, modality=0, connection_requirements={}, emission_value=0, burn=τ0.066468716, owner_ss58='5HCFWvRqzSHWRPecN7q8J6c7aKQnrCZTMHstPv39xL1wgDHh'),
 SubnetInfo(netuid=2, rho=10, kappa=32767, difficulty=1268010552201, immunity_period=5000, max_allowed_validators=64, min_allowed_weights=1, max_weight_limit=65535, scaling_law_power=50, subnetwork_n=256, max_n=256, 