# Sheet

# Memecoin Strategy based on Social Data

An example strategy backtest loading external social data to drive decision making in this automated memecoin strategy. Enter into trades where there is an uptick in mindshare

# Set up

Set up Trading Strategy data client.

In [None]:
from tradeexecutor.utils.notebook import setup_charting_and_output
from tradingstrategy.client import Client
from tradeexecutor.utils.notebook import setup_charting_and_output, OutputMode
# import ipdb

client = Client.create_jupyter_client()

# Set up drawing charts in interactive vector output mode.
# This is slower. See the alternative commented option below.
# Kernel restart needed if you change output mode.
# setup_charting_and_output(OutputMode.interactive)

# Set up rendering static PNG images.
# This is much faster but disables zoom on any chart.
setup_charting_and_output(OutputMode.static, image_format="png", width=1500, height=1000)

# Prerequisites

To run this backtest, you first need to run `scripts/prefilter-ethereum.py` to build a Polygon dataset (> 1 GB) for this backtest based on more than 10 GB downloaded data.

In [None]:
from pathlib import Path

import pandas as pd

from tradingstrategy.chain import ChainId
from tradingstrategy.client import Client
from tradingstrategy.timebucket import TimeBucket
import datetime

liquidity_output_fname = Path("/tmp/base-liquidity-prefiltered.parquet")
price_output_fname = Path("/tmp/base-price-prefiltered.parquet")

# If the pair does not have this liquidity, skip
min_prefilter_liquidity = 10_000

chain_id = ChainId.base
time_bucket = TimeBucket.d1
client = Client.create_jupyter_client()

# We need pair metadata to know which pairs belong to Polygon
print("Downloading/opening pairs dataset")
pairs_df = client.fetch_pair_universe().to_pandas()
our_chain_pair_ids = pairs_df[pairs_df.chain_id == chain_id.value]["pair_id"].unique()

print(f"We have data for {len(our_chain_pair_ids)} trading pairs on {chain_id.name}")

# Download all liquidity data, extract
# trading pairs that exceed our prefiltering threshold
print("Downloading/opening liquidity dataset")
liquidity_df = client.fetch_all_liquidity_samples(time_bucket).to_pandas()
print(f"Filtering out liquidity for chain {chain_id.name}")
liquidity_df = liquidity_df.loc[liquidity_df.pair_id.isin(our_chain_pair_ids)]
liquidity_per_pair = liquidity_df.groupby(liquidity_df.pair_id)
print(f"Chain {chain_id.name} has liquidity data for {len(liquidity_per_pair.groups)}")

liquidity_df.to_parquet(liquidity_output_fname)

print(f"Wrote {liquidity_output_fname}, {liquidity_output_fname.stat().st_size:,} bytes")

print("Downloading/opening OHLCV dataset")
price_df = client.fetch_all_candles(time_bucket).to_pandas()
price_df.to_parquet(price_output_fname)

print(f"Wrote {price_output_fname}, {price_output_fname.stat().st_size:,} bytes")

In [3]:
from pathlib import Path
import pandas as pd
# See scripts/prefilter-polygon.py

liquidity_output_fname = Path("/tmp/base-liquidity-prefiltered.parquet")
price_output_fname = Path("/tmp/base-price-prefiltered.parquet")

assert price_output_fname.exists(), "Run prefilter script first"
assert liquidity_output_fname.exists(), "Run prefilter script first"

# Custom data

Load the custom data from a CSV file.

- Load using Pandas
- This data will be split and mapped to per-pair indicators later on, as the data format is per-pair

*Note*: Relative paths work different in different notebook run-time environments. Below is for Visual Studio Code.

In [None]:
import requests
import pandas as pd
from typing import List, Dict, Union
from datetime import datetime

# Global variables
API_KEY = "6a1c9782-436a-47bc-bde6-c883088775a6"
BASE_URL = "https://api.qa.trendmoon.ai/"


def get_category_coins_scored(category: str = "Base Meme", top_n: int = 150) -> List[Dict]:
    """
    Get scored coins for a specific category from TrendMoon API and enrich with contract addresses.
    
    Args:
        category (str): Category to search for
        top_n (int): Number of top coins to retrieve
        
    Returns:
        List[Dict]: List of coin metadata with scores and contract addresses
    """
    # First get the scored data from new endpoint
    endpoint = "get_category_coins"
    
    headers = {
        'accept': 'application/json',
        'Api-key': API_KEY
    }
    
    params = {
        'category_name': category,
        'top_n': top_n
    }
    
    response = requests.get(
        f"{BASE_URL}{endpoint}",
        headers=headers,
        params=params
    )
    
    if response.status_code != 200:
        raise Exception(f"API request failed with status code {response.status_code}: {response.text}")
    
    scored_data = response.json()
    
    # Get contract addresses using existing function
    coins_with_contracts = get_category_coins(category=category)
    
    # Create a mapping of coin_id to contract address
    contract_map = {
        coin['id']: coin['contract_address'] 
        for coin in coins_with_contracts
    }
    
    # Enrich scored coins with contract addresses
    enriched_coins = []
    for coin in scored_data['coins']:
        coin_data = {
            'id': coin['coin_id'],
            'date': coin['date'],
            'name': coin['name'],
            'symbol': coin['symbol'],
            'score': coin['score'],
            'technical_indicator_score': coin['technical_indicator_score'],
            'social_indicator_score': coin['social_indicator_score'],
            'day_trend': coin['1_day_trend'],
            'day_perc_diff': coin['day_perc_diff'],
            'social_mentions': coin['social_mentions'],
            'social_dominance': coin['social_dominance'],
            'category_relative_social_dominance': coin['category_relative_social_dominance'],
            'mentions_ma': coin['mentions_ma'],
            'mentions_upper_band': coin['mentions_upper_band'],
            'social_ma_crossover': coin['social_ma_crossover'],
            'price_above_ma_20': coin['price_above_ma_20'],
            'price_above_ma_50': coin['price_above_ma_50'],
            'macd_above_signal': coin['macd_above_signal'],
            'price_momentum': coin['price_momentum'],
            'price_pct_change': coin['price_pct_change'],
            'volume_pct_change': coin['volume_pct_change'],
            'volume_ratio_20': coin['volume_ratio_20'],
            'total_volume': coin['total_volume'],
            'market_cap': coin['market_cap'],
            'fully_diluted_valuation': coin['fully_diluted_valuation'],
            'contract_address': contract_map.get(coin['coin_id'])  # Add contract address from mapping
        }
        enriched_coins.append(coin_data)
    
    # Add metadata about the request
    result = {
        'category_name': scored_data['category_name'],
        'date': scored_data['date'],
        'data_source': scored_data['data_source'],
        'top_n': scored_data['top_n'],
        'last_updated': scored_data['last_updated'],
        'coins': enriched_coins
    }
    
    return result

def get_category_coins(category: str = "Base Meme") -> List[Dict]:
    """
    Get coins for a specific category from TrendMoon API.
    
    Args:
        category (str): Category to search for
        
    Returns:
        List[Dict]: List of coin metadata
    """
    endpoint = "coins/search"
    
    headers = {
        'accept': 'application/json',
        'Api-key': API_KEY
    }
    
    params = {
        'category': category,
        'page': 1,
        'page_size': 100
    }
    
    response = requests.get(
        f"{BASE_URL}{endpoint}",
        headers=headers,
        params=params
    )
    
    if response.status_code != 200:
        raise Exception(f"API request failed with status code {response.status_code}: {response.text}")
    
    coins = response.json()
    
    # Extract only the fields we need
    extracted_coins = []
    for coin in coins:
        extracted_coin = {
            'id': coin['id'],
            'name': coin['name'],
            'symbol': coin['symbol'],
            'contract_address': coin['platforms'].get('base'),  # Get Base chain contract address
            'market_cap': coin['market_cap'],
            'fdv': coin['fully_diluted_valuation']
        }
        extracted_coins.append(extracted_coin)
    
    return extracted_coins

def get_coin_trends(
    coin_ids: List[str],
    start_date: str = "2025-01-01",
    end_date: str = "2025-05-15",
    interval: str = "1d"
) -> Dict[str, pd.DataFrame]:
    """
    Get trends data for specified coins from TrendMoon API.
    
    Args:
        coin_ids (List[str]): List of coin IDs to fetch trends for
        start_date (str): Start date in YYYY-MM-DD format
        end_date (str): End date in YYYY-MM-DD format
        interval (str): Data interval (e.g., "1d" for daily)
        
    Returns:
        Dict[str, pd.DataFrame]: Dictionary mapping coin IDs to their respective trend data DataFrames
    """
    endpoint = "social/trends"
    
    headers = {
        'accept': 'application/json',
        'Api-key': API_KEY
    }
    
    params = {
        'start_date': start_date,
        'end_date': end_date,
        'interval': interval,
        'coin_ids': coin_ids  
    }

    print('params', params)
    
    response = requests.get(
        f"{BASE_URL}{endpoint}",
        headers=headers,
        params=params
    )
    
    if response.status_code != 200:
        raise Exception(f"API request failed with status code {response.status_code}: {response.text}")
    
    trends_data = response.json()
    
    # Create a dictionary to store DataFrames for each coin
    coin_dfs = {}
    
    for coin_data in trends_data:
        coin_id = coin_data['coin_id']
        
        # Convert trend_market_data to DataFrame
        df = pd.DataFrame(coin_data['trend_market_data'])
        
        # Convert date column to datetime but keep it as a column
        df['date'] = pd.to_datetime(df['date'])
        
        # Store DataFrame in dictionary with coin_id as key
        coin_dfs[coin_id] = df
    
    return coin_dfs

def merge_coin_trends_data(coins: List[Dict], trends: Dict[str, pd.DataFrame]) -> pd.DataFrame:
    """
    Merge coin metadata with trends data into a single DataFrame.
    
    Args:
        coins (List[Dict]): List of coin metadata from get_category_coins
        trends (Dict[str, pd.DataFrame]): Dictionary of trend DataFrames from get_coin_trends
        
    Returns:
        pd.DataFrame: Combined DataFrame with MultiIndex (coin_id, date)
    """
    # Create a list to store all the data
    all_data = []
    
    for coin in coins:
        coin_id = coin['id']
        if coin_id in trends:
            df = trends[coin_id].copy()
            
            # Add coin metadata
            df['coin_id'] = coin_id
            df['name'] = coin['name']
            df['symbol'] = coin['symbol']
            df['contract_address'] = coin['contract_address']
            
            # Append to our list
            all_data.append(df)
    
    # Concatenate all DataFrames
    combined_df = pd.concat(all_data, ignore_index=True)
    
    # Set MultiIndex
    combined_df.set_index(['coin_id', 'date'], inplace=True)
    
    # Define column order
    column_order = [
        'sentiment_score', 'symbol_count', 'name_count', 'social_mentions',
        'social_dominance', 'price', 'market_cap', 'total_volume',
        'lc_posts_active', 'lc_interactions', 'lc_contributors_created',
        'lc_contributors_active', 'lc_social_volume_24h',
        'day_social_perc_diff', 'hour_social_perc_diff', 'name', 'symbol',
        'contract_address'
    ]
    
    # Ensure all columns exist
    for col in column_order:
        if col not in combined_df.columns:
            combined_df[col] = None
    
    # Reorder columns
    combined_df = combined_df[column_order]
    
    return combined_df

# Example usage:
# Get coins data
coins = get_category_coins(category="Base Meme")
coin_ids = [coin['id'] for coin in coins]

# Get trends data
coin_trends = get_coin_trends(coin_ids=coin_ids, start_date="2025-01-01", end_date="2025-05-15", interval="1d")

# Merge the data
df_trend = merge_coin_trends_data(coins, coin_trends)



In [None]:
df_trend.head()

In [None]:
# Example usage
scored_coins = get_category_coins_scored(category="Base Meme", top_n=150)

# Access the enriched data
for coin in scored_coins['coins']:
    print(f"Name: {coin['name']}")
    print(f"Score: {coin['score']}")
    print(f"Contract: {coin['contract_address']}")
    print("---")

In [None]:
custom_data_group.get_group('0xba5e66fb16944da22a62ea4fd70ad02008744460').index

In [None]:
NUM_TOKENS = 40


latest_data = df_trend[df_trend.index.get_level_values('date') == df_trend.index.get_level_values('date').max()]
latest_data.dropna(subset=['contract_address'], inplace=True)
token_list_backtest = latest_data.sort_values(by='total_volume', ascending=False)[['symbol', 'contract_address']].iloc[:NUM_TOKENS]


base_erc20_address_list = []
erc20_addresses_avoid = ['0xA3c322Ad15218fBFAEd26bA7f616249f7705D945'.lower()]
base_erc20_address_list += token_list_backtest['contract_address'].tolist()
base_erc20_address_list = [address for address in set(base_erc20_address_list) if address.lower() not in erc20_addresses_avoid]
print(f"length of base_erc20_address_list: {len(base_erc20_address_list)}")
# Create per-pair DataFrame group by
df_trend = df_trend.reset_index()
df_trend['date'] = df_trend['date'].dt.tz_localize(None)
df_trend = df_trend.set_index(['coin_id', 'date'])
custom_data_group = df_trend.reset_index().set_index('date').sort_index().groupby('contract_address')

# Parameters

- Strategy parameters define the fixed and grid searched parameters

In [6]:
from tradingstrategy.chain import ChainId
import datetime

from tradeexecutor.strategy.default_routing_options import TradeRouting
from tradingstrategy.timebucket import TimeBucket
from tradeexecutor.strategy.cycle import CycleDuration
from tradeexecutor.strategy.parameters import StrategyParameters

from skopt.space import Integer, Real, Categorical

class Parameters:
    """Parameteres for this strategy.

    - Collect parameters used for this strategy here

    - Both live trading and backtesting parameters
    """

    id = "base-sentimeme"  # Used in cache paths

    cycle_duration = CycleDuration.d1  # Daily rebalance
    candle_time_bucket = TimeBucket.d1
    allocation = 0.142
    max_assets = 7

    #
    # Liquidity risk analysis and data quality
    #
    min_price = 0.00000000000000000001
    max_price = 1_000_000
    min_liquidity_trade_threshold = 0.05
    min_liquidity_threshold = 25000
    min_volume = 30000

    # Trigger
    # Safety Guards
    minimum_mometum_threshold = 0.1
    momentum_lookback_bars = 9

    sma_length = 12
    social_ema_short_length = 6
    social_ema_long_length = 11
    cross_over_period = 2
    social_ma_min = 10

    stop_loss_pct = 0.92
    trailing_stop_loss_pct = 0.86
    trailing_stop_loss_activation_level = 1.3

    # Trade execution parameters
    slippage_tolerance = 0.06
    max_buy_tax = 0.06
    max_sell_tax = 0.06
    token_risk_threshold = 50
    # If the pair does not have enough real time quote token TVL, skip trades smaller than this
    min_trade_size_usd = 1.00
    # Only do trades where we are less than 1% of the pool quote token TVL
    per_position_cap_of_pool = 0.01

    #
    # Live trading only
    #
    chain_id = ChainId.base
    routing = TradeRouting.default
    required_history_period = datetime.timedelta(days=30 + 1)

    #
    # Backtesting only
    #
    backtest_start = datetime.datetime(2025, 1, 1)
    backtest_end = datetime.datetime(2025, 5, 20)
    initial_cash = 10_000
    initial_deposit = 10_000

    stop_loss_time_bucket = TimeBucket.h1


parameters = StrategyParameters.from_class(Parameters)  # Convert to AttributedDict to easier typing with dot notation
# parameters = StrategyParameters.from_class(Parameters, grid_search=True)  # Convert to AttributedDict to easier typing with dot notation

# Trading pairs and market data

- Get a list of ERC-20 tokens we are going to trade on Polygon
- Trading pairs are automatically mapped to the best volume /USDC or /WMATIC pair
    - Limited to current market information - no historical volume/liquidity analyses performed here
- This data loading method caps out at 75 trading pairs

In [7]:

def filter_pairs_by_risk(
    pairs_df: pd.DataFrame,
    risk_threshold: int = 60,
    max_buy_tax: float = 6.0,
    max_sell_tax: float = 6.0,
    risk_traits: dict = None,
) -> pd.DataFrame:
    """Filter pairs DataFrame based on tax rates, TokenSniffer risk score, and specific risk traits.

    Args:
        pairs_df (pd.DataFrame): DataFrame containing trading pair information
        risk_threshold (int): Minimum acceptable TokenSniffer risk score (0-100, higher is better)
        max_buy_tax (float): Maximum allowed buy tax percentage (default 6.0)
        max_sell_tax (float): Maximum allowed sell tax percentage (default 6.0)
        risk_traits (dict): Dictionary of risk traits to filter on. If None, only tax and risk score are checked

    Returns:
        pd.DataFrame: Filtered pairs DataFrame containing only pairs meeting all criteria

    Example Risk Traits Dictionary:
    ```python
    # Complete risk traits dictionary
    risk_traits = {
        # Contract-level risks
        'has_mint': False,                    # Can new tokens be minted
        'has_fee_modifier': False,            # Can fees be modified after deployment
        'has_max_transaction_amount': False,   # Presence of max transaction limits
        'has_blocklist': False,               # Can addresses be blacklisted
        'has_proxy': False,                   # Is it a proxy contract (upgradeable)
        'has_pausable': False,                # Can trading be paused

        # Ownership and control risks
        'is_ownership_renounced': True,       # Ownership should be renounced
        'is_source_verified': True,           # Contract should be verified

        # Trading risks
        'is_sellable': True,                  # Token can be sold
        'has_high_buy_fee': False,            # High buy fees present
        'has_high_sell_fee': False,           # High sell fees present
        'has_extreme_fee': False,             # Extremely high fees (>30%)

        # Liquidity risks
        'has_inadequate_liquidity': False,    # Insufficient liquidity
        'has_inadequate_initial_liquidity': False,  # Started with low liquidity

        # Token distribution risks
        'has_high_creator_balance': False,    # Creator holds large portion
        'has_high_owner_balance': False,      # Owner holds large portion
        'has_high_wallet_balance': False,     # Any wallet holds too much
        'has_burned_exceeds_supply': False,   # Burned amount > supply (impossible)

        # Additional safety checks
        'is_flagged': False,                  # Token is flagged for issues
        'is_honeypot': False,                 # Known honeypot
        'has_restore_ownership': False,       # Can ownership be restored
        'has_non_standard_erc20': False       # Non-standard ERC20 implementation
    }
    ```

    Example Risk Profiles:
    ```python
    # Conservative (strict) settings
    conservative_risk_traits = {
        'has_mint': False,
        'has_fee_modifier': False,
        'has_blocklist': False,
        'has_proxy': False,
        'has_pausable': False,
        'is_ownership_renounced': True,
        'is_source_verified': True,
        'is_sellable': True,
        'has_high_buy_fee': False,
        'has_high_sell_fee': False,
        'is_flagged': False,
        'is_honeypot': False
    }

    # Moderate settings
    moderate_risk_traits = {
        'has_mint': False,
        'is_source_verified': True,
        'is_sellable': True,
        'has_extreme_fee': False,
        'is_honeypot': False,
        'is_flagged': False
    }

    # Aggressive settings
    aggressive_risk_traits = {
        'is_sellable': True,
        'is_honeypot': False,
        'is_flagged': False
    }
    ```

    Usage:
    ```python
    # Using conservative settings with custom tax limits
    filtered_df = filter_pairs_by_risk(
        pairs_df,
        risk_threshold=60,
        max_buy_tax=5.0,
        max_sell_tax=5.0,
        risk_traits=conservative_risk_traits
    )

    # Custom risk profile
    custom_risk_traits = {
        'is_sellable': True,
        'is_honeypot': False,
        'has_mint': False,
        'has_extreme_fee': False,
        'is_source_verified': True
    }
    filtered_df = filter_pairs_by_risk(
        pairs_df,
        risk_threshold=70,
        max_buy_tax=3.0,
        max_sell_tax=3.0,
        risk_traits=custom_risk_traits
    )
    ```
    """
    # Create a copy to avoid modifying original
    filtered_df = pairs_df.copy()
    initial_count = len(filtered_df)

    # Replace NaN values with 0 for buy_tax and sell_tax
    filtered_df["buy_tax"] = filtered_df["buy_tax"].fillna(0)
    filtered_df["sell_tax"] = filtered_df["sell_tax"].fillna(0)

    # Filter for pairs meeting tax thresholds
    filtered_df = filtered_df[
        (filtered_df["buy_tax"] <= max_buy_tax)
        & (filtered_df["sell_tax"] <= max_sell_tax)
    ]

    after_tax_count = len(filtered_df)
    print(f"After tax filter we have {after_tax_count} trading pairs")

    def check_token_risk(row):
        try:
            # Extract TokenSniffer data from the nested structure
            token_data = row["other_data"]["top_pair_data"].token_sniffer_data
            if token_data is None:
                return False

            print(f"Token data: {token_data}")
            # Check risk score threshold
            if token_data.get("riskScore", 0) < risk_threshold:
                return False

            # Check each specified risk trait if provided
            if risk_traits:
                for trait, desired_value in risk_traits.items():
                    if token_data.get(trait, not desired_value) != desired_value:
                        return False

            return True

        except (KeyError, AttributeError) as e:
            print(f"Error processing row: {e}")
            return False

    # Apply TokenSniffer filters if risk_traits provided
    if risk_traits is not None:
        filtered_df = filtered_df[filtered_df.apply(check_token_risk, axis=1)]

    final_count = len(filtered_df)

    print(
        "Filtering results: Initial pairs: %d, after tax filters: %d, after risk filters: %d",
        initial_count,
        after_tax_count,
        final_count,
    )

    return filtered_df



In [None]:
from tradingstrategy.universe import Universe
from tradingstrategy.liquidity import GroupedLiquidityUniverse
from tradeexecutor.strategy.pandas_trader.alternative_market_data import resample_multi_pair
from tradingstrategy.candle import GroupedCandleUniverse
from tradingstrategy.pair import filter_for_base_tokens, PandasPairUniverse, StablecoinFilteringMode, \
    filter_for_stablecoins
from tradingstrategy.client import Client

from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse, translate_token
from tradeexecutor.strategy.execution_context import ExecutionContext, notebook_execution_context
from tradeexecutor.strategy.universe_model import UniverseOptions
from datetime import timedelta
from tradingstrategy.pair import (
    HumanReadableTradingPairDescription,
    PandasPairUniverse,
    StablecoinFilteringMode,
    filter_for_base_tokens,
    filter_for_stablecoins,
    filter_for_quote_tokens
)
from tradingstrategy.timebucket import TimeBucket
from tradingstrategy.universe import Universe
from tradingstrategy.utils.token_filter import (
    deduplicate_pairs_by_volume,
    add_base_quote_address_columns,
)
from tradingstrategy.utils.token_extra_data import load_extra_metadata

USDC = "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913".lower()
WETH = "0x4200000000000000000000000000000000000006".lower()

#If liquidity 

# We care only Quickswap and Uniswap v3 pairs.
# ApeSwap is mostly dead, but listed many bad tokens 
# in the past, so it is good to include in the sample set.
SUPPORTED_DEXES = {"uniswap-v3", "uniswap-v2", "sushiswap", "aerodrome"}


avoid_backtesting_tokens = {
    # Trading jsut stops (though there is liq left)
    # https://tradingstrategy.ai/trading-view/ethereum/uniswap-v3/id-usdc-fee-30
    "PEOPLE",
    "WBTC"
}

# Get the token list of everything in the CSV + hardcoded WMATIC
custom_data_token_set = {WETH} | set(base_erc20_address_list)


def create_trading_universe(
    timestamp: datetime.datetime,
    client: Client,
    execution_context: ExecutionContext,
    universe_options: UniverseOptions,
) -> TradingStrategyUniverse:
    """Create the trading universe."""
    start_at = parameters.backtest_start
    end_at = parameters.backtest_end

    print(f"Backtesting {start_at} - {end_at}")

    chain_id = Parameters.chain_id

    exchange_universe = client.fetch_exchange_universe()
    
    exchange_universe = exchange_universe.limit_to_chains({Parameters.chain_id}).limit_to_slugs(SUPPORTED_DEXES)
    print(f"Exchange universe: {exchange_universe.get_all_slugs()}")
    print(f"We support {exchange_universe.get_exchange_count()} DEXes")
    
# 6239
# 6294    21461 uniswap v2
# 6091     1135 baseswap
# 6179      482 sushiswap
# 6051      392


    pairs_df = client.fetch_pair_universe().to_pandas()

    liquidity_df = pd.read_parquet(liquidity_output_fname)
    price_df = pd.read_parquet(price_output_fname)
    print(f"Length of price_df: {len(price_df)}")

    # # When reading from Parquet file, we need to deal with indexing by hand
    liquidity_df.index = pd.DatetimeIndex(liquidity_df.timestamp)
    price_df.index = pd.DatetimeIndex(price_df.timestamp)

    print(f"Prefilter data contains {len(liquidity_df):,} liquidity samples dn {len(price_df):,} OHLCV candles")
    print(f"type of start_at: {type(start_at)}")
    print(f"type of end_at: {type(end_at)}")

    # Crop price and liquidity data to our backtesting range
    price_df = price_df.loc[(price_df.timestamp >= start_at) & (price_df.timestamp <= end_at)]
    print(f"Price data range {price_df.timestamp.min()} - {price_df.timestamp.max()}")
    liquidity_df = liquidity_df.loc[(liquidity_df.timestamp >= start_at) & (liquidity_df.timestamp <= end_at)]

    # Prefilter for more liquidity conditions
    liquidity_per_pair = liquidity_df.groupby(liquidity_df.pair_id)
    print(f"Chain {chain_id.name} has liquidity data for {len(liquidity_per_pair.groups)}")

    #Uncomment for highest liqudiity pairs
    # passed_pair_ids = set()
    # for pair_id, pair_df in liquidity_per_pair:
    #     if pair_df["high"].max() > Parameters.min_liquidity_threshold:
    #         passed_pair_ids.add(pair_id)

    # Get the date 30 days ago
    thirty_days_ago = end_at - timedelta(days=30)

    # Create a subset of the data for the last 30 days, in live trading this makes more sense
    # liquidity_last_30_days = liquidity_df #liquidity_df[liquidity_df['timestamp'] > thirty_days_ago]
    # liquidity_per_pair_last_30d = liquidity_last_30_days.groupby('pair_id')
    # passed_pair_ids = set()
    # for pair_id, pair_df in liquidity_per_pair_last_30d:
    #     # Check the maximum high liquidity in the last 30 days
    #     if pair_df["high"].max() > Parameters.min_liquidity_threshold:
    #         passed_pair_ids.add(pair_id)

    # pairs_df = pairs_df.loc[pairs_df.pair_id.isin(passed_pair_ids)]

    # pairs_df.to_csv('pairs_df_post_liquidity_filter.csv')
    # print(f"There are {len(passed_pair_ids)} after liquidity filter")
    print(f"After liquidity filter {Parameters.min_liquidity_threshold:,} USD we have {len(pairs_df)} trading pairs")


    # allowed_exchange_ids = set(exchange_universe.exchanges.keys()) | {6294}
    allowed_exchange_ids = set(exchange_universe.exchanges.keys())
    print(f"Allowed exchange ids: {exchange_universe.exchanges.keys()}")
    pairs_df.to_csv('pairs_df_pre_exchange_filter.csv')
    pairs_df = pairs_df.loc[pairs_df.exchange_id.isin(allowed_exchange_ids)]
    print(f"After DEX filter we have {len(pairs_df)} trading pairs")
    pairs_df.to_csv('pairs_df_after_exchange_filter.csv')

    # Store reference USDC ETH pair so we have an example pair with USDC as quote token for reserve asset
    # Get ETH-USDC pairs from all major DEXes
    eth_usdc_addresses = [
        "0x88A43bbDF9D098eEC7bCEda4e2494615dfD9bB9C",  # Uniswap V2,
        "0xd0b53D9277642d899DF5C87A3966A349A798F224"  # Uniswap V3
    ]
    print(f"ETH-USDC pairs: {eth_usdc_addresses}")
    
    ref_usdc_pairs = pairs_df[
        pairs_df["address"].isin([addr.lower() for addr in eth_usdc_addresses])
    ].copy()

    # Pairs pre-processing
    pairs_df = add_base_quote_address_columns(pairs_df)
    pairs_df = pairs_df.loc[
        (pairs_df["base_token_address"].isin(base_erc20_address_list))
        & (pairs_df["chain_id"] == chain_id)
    ]

    pairs_df = filter_for_base_tokens(pairs_df, list(base_erc20_address_list))

    print(f"Before deduplication we have {len(pairs_df)} trading pairs")
    pairs_df = deduplicate_pairs_by_volume(pairs_df)
    print(f"After deduplication we have {len(pairs_df)} trading pairs")

    # Retrofit TokenSniffer data
    pairs_df = load_extra_metadata(
        pairs_df,
        client=client,
    )

    pairs_df = filter_pairs_by_risk(
        pairs_df,
        risk_threshold=Parameters.token_risk_threshold,
        max_buy_tax=Parameters.max_buy_tax,
        max_sell_tax=Parameters.max_sell_tax,
        risk_traits=None,
    )

    pairs_df = filter_for_stablecoins(pairs_df, StablecoinFilteringMode.only_volatile_pairs)
    print(f"After custom data ERC-20 token address cross section filter we have {len(pairs_df)} matching trading pairs")
    
    # We want to keep only USDC or WETH quoted pairs
    USDC = "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913".lower()
    WETH = "0x4200000000000000000000000000000000000006".lower()
    pairs_df = filter_for_quote_tokens(
        pairs_df,
        {
            USDC,
            WETH,
        },
    )

    ref_usdc_pairs.to_csv('ref_usdc_pairs.csv')

    print(f"Before deduplication we have {len(pairs_df)} trading pairs")
    pairs_df = deduplicate_pairs_by_volume(pairs_df)
    print(f"After deduplication we have {len(pairs_df)} trading pairs")

    pairs_df = pd.concat([pairs_df, ref_usdc_pairs]).drop_duplicates(subset=["pair_id"])
    print(f"After adding ref USDC pairs we have {len(pairs_df)} trading pairs")

    pairs_df.to_csv('pairs_df_base_quote_token_filter.csv')
    

    # Resample strategy decision candles to daily
    daily_candles = resample_multi_pair(price_df, Parameters.candle_time_bucket)
    daily_candles["timestamp"] = daily_candles.index

    print(f"After downsampling we have {len(daily_candles)} OHLCV candles and {len(liquidity_df)} liquidity samples")
    candle_universe = GroupedCandleUniverse(
        daily_candles,
        time_bucket=Parameters.candle_time_bucket,
        forward_fill=True  # Forward will should make sure we can always calculate RSI, other indicators
    )

    liquidity_universe = GroupedLiquidityUniverse(liquidity_df)

    # The final trading pair universe contains metadata only for pairs that passed
    # our filters
    pairs_universe = PandasPairUniverse(pairs_df, exchange_universe=exchange_universe)
    stop_loss_candle_universe = GroupedCandleUniverse(price_df)

    data_universe = Universe(
        time_bucket=Parameters.candle_time_bucket,
        # liquidity_time_bucket=Parameters.candle_time_bucket,
        exchange_universe=exchange_universe,
        pairs=pairs_universe,
        candles=candle_universe,
        liquidity=liquidity_universe,
        chains={Parameters.chain_id},
        forward_filled=True,
    )

    reserve_asset = translate_token(pairs_universe.get_token(USDC))

    _strategy_universe = TradingStrategyUniverse(
        data_universe=data_universe,
        backtest_stop_loss_time_bucket=Parameters.stop_loss_time_bucket,
        backtest_stop_loss_candles=stop_loss_candle_universe,
        reserve_assets={reserve_asset},
        price_data_delay_tolerance=pd.Timedelta(days=7)
    )

    return _strategy_universe, pairs_universe


strategy_universe, pairs_universe = create_trading_universe(
    None,
    client,
    notebook_execution_context,
    UniverseOptions.from_strategy_parameters_class(Parameters, notebook_execution_context)
    
)

broken_trading_pairs = set()

#
# Extra sanity checks
# 
# Ru some extra sanity check for small cap tokens
#

print("Checking trading pair quality")
print("-" * 80)
pairs_to_avoid = [87449]

for pair in strategy_universe.iterate_pairs():
    reason = strategy_universe.get_trading_broken_reason(pair, min_candles_required=10, min_price=parameters.min_price, max_price=parameters.max_price)
    if pair.internal_id in pairs_to_avoid:
        broken_trading_pairs.add(pair)
    if reason:
        print(f"FAIL: {pair} with base token {pair.base.address} may be problematic: {reason}")
        broken_trading_pairs.add(pair)
    else:
        print(f"OK: {pair} included in the backtest")

print(f"Total {len(broken_trading_pairs)} broken trading pairs detected, having {strategy_universe.get_pair_count() - len(broken_trading_pairs)} good pairs left to trade")

# Indicators

- We use `pandas_ta` Python package to calculate technical indicators
- These indicators are precalculated and cached on the disk
- This includes caching our custom made indicators, so we only calculate them once

In [9]:
import pandas as pd
import pandas_ta
import datetime
from tradeexecutor.analysis.regime import Regime
from tradeexecutor.state.identifier import TradingPairIdentifier
from tradeexecutor.strategy.execution_context import ExecutionContext
from tradeexecutor.strategy.pandas_trader.indicator import IndicatorSet, IndicatorSource
from tradeexecutor.strategy.parameters import StrategyParameters
from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse
from tradeexecutor.utils.crossover import contains_cross_over, contains_cross_under
import pandas as pd


def add_metric(pair: TradingPairIdentifier, metric_name: str) -> pd.Series:
    """
    Add a specific metric to the dataset, filling in missing dates and forward filling values.
    Handles duplicate dates by summing the values.
    """
    contract_address = pair.base.address
    try:
        contract_address = pair.base.address
        per_pair = custom_data_group.get_group(contract_address)
        per_pair_series = per_pair[metric_name]
        metric_series = per_pair_series[
            ~per_pair_series.index.duplicated(keep="last")
        ].rename(metric_name)
        full_date_range = pd.date_range(
            start=metric_series.index.min(), end=metric_series.index.max(), freq="D"
        )
        metric_series = metric_series.reindex(full_date_range)
        metric_series = metric_series.ffill()
        metric_series.sort_index(inplace=True)
        return metric_series
    except Exception as e:
        print(f"Error adding metric {metric_name} for pair {pair}: {str(e)}")
        return pd.Series(dtype="float64", index=pd.DatetimeIndex([]))

def calculate_metric_emas(pair: TradingPairIdentifier, metric_name: str, short_length: int, long_length: int) -> pd.DataFrame:
    """Calculate short and long EMAs for a specific metric."""
    metric_series = add_metric(pair, metric_name)
    metric_series = metric_series.interpolate(method='linear', limit_direction='forward')
    ema_short = metric_series.ewm(span=short_length, adjust=False).mean().rename(f"{metric_name}_ema_short")
    ema_long = metric_series.ewm(span=long_length, adjust=False).mean().rename(f"{metric_name}_ema_long")
    emas_df = pd.concat([ema_short, ema_long], axis=1)
    return emas_df


def calculate_metric_bbands(pair: TradingPairIdentifier, metric_name: str, length: int, std: int) -> pd.DataFrame:
    """Calculate Bollinger Bands for a specific metric."""
    try:
        metric_series = add_metric(pair, metric_name)
        metric_series.sort_index(inplace=True)
        metric_series = metric_series.interpolate(method='linear', limit_direction='forward')
        
        print(f"Length of metric series for {metric_name}: {len(metric_series)}")
        
        # Calculate Bollinger Bands
        bb = pandas_ta.bbands(metric_series, length=length, std=std)
        bb = bb.rename(columns={
            f'BBL_{length}_{std}': f'{metric_name}_BBL_{length}_{std}',
            f'BBM_{length}_{std}': f'{metric_name}_BBM_{length}_{std}',
            f'BBU_{length}_{std}': f'{metric_name}_BBU_{length}_{std}',
            f'BBB_{length}_{std}': f'{metric_name}_BBB_{length}_{std}',
            f'BBP_{length}_{std}': f'{metric_name}_BBP_{length}_{std}'
        })
        
        return bb
    except Exception as e:
        print(f"Error calculating Bollinger Bands for {metric_name}: {str(e)}")
        # Return an empty DataFrame in case of error
        return pd.Series(dtype="float64", index=pd.DatetimeIndex([]))
    
def momentum(close, momentum_lookback_bars) -> pd.Series:
    """Calculate momentum series to be used as a signal.

    This indicator is later processed in decide_trades() to a weighted alpha signal.
    
    :param momentum_lookback_bars:
        Calculate returns based on this many bars looked back
    """
    # https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.shift.html#pandas.DataFrame.shift
    start_close = close.shift(momentum_lookback_bars)
    momentum = (close - start_close) / start_close
    return momentum


def calculate_macd(price_data: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> pd.DataFrame:
    # Calculate the MACD and signal line
    exp1 = price_data.ewm(span=fast, adjust=False).mean()
    exp2 = price_data.ewm(span=slow, adjust=False).mean()
    macd_line = exp1 - exp2
    signal_line = macd_line.ewm(span=signal, adjust=False).mean()
    histogram = macd_line - signal_line
    
    return pd.DataFrame({
        'macd': macd_line,
        'signal': signal_line,
        'histogram': histogram
    })

def calculate_rsi(close, length=14):
    """Calculate RSI"""
    delta = close.diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=length).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=length).mean()
    rs = gain / loss
    return 100 - (100 / (1 + rs))

def calculate_obv(close: pd.Series, volume: pd.Series) -> pd.Series:
    """Calculates the On Balance Volume (OBV) indicator.
    
    Args:
        close: Series of closing prices
        volume: Series of volume values
        
    Logic:
        - First point: OBV = Volume
        - If close price increases: OBV = previous OBV + Volume
        - If close price decreases: OBV = previous OBV - Volume
        - If close price unchanged: OBV = previous OBV
        
    Returns:
        pd.Series: OBV values with same index as inputs
    """
    assert len(close) == len(volume), "Close prices and volume must have same length"
    assert all(volume > 0), "Volume must be positive values"

    obv = pd.Series(index=close.index, dtype=float)
    obv.iloc[0] = volume.iloc[0]
    
    for i in range(1, len(close)):
        if close.iloc[i] > close.iloc[i-1]:
            obv.iloc[i] = obv.iloc[i-1] + volume.iloc[i]
        elif close.iloc[i] < close.iloc[i-1]:
            obv.iloc[i] = obv.iloc[i-1] - volume.iloc[i]
        else:
            obv.iloc[i] = obv.iloc[i-1]
    
    return obv

def create_indicators(
    timestamp: datetime.datetime | None,
    parameters: StrategyParameters,
    strategy_universe: TradingStrategyUniverse,
    execution_context: ExecutionContext
):
    indicators = IndicatorSet()

    indicators.add(
        "momentum",
        momentum,
        {"momentum_lookback_bars": parameters.momentum_lookback_bars},
        IndicatorSource.close_price,
    ) 

    indicators.add(
        "macd",
        calculate_macd,
        {"fast": 12, "slow": 26, "signal": 9},
        IndicatorSource.close_price,
    )

    # Add RSI
    indicators.add(
        "rsi",
        calculate_rsi,
        {"length": 14},
        IndicatorSource.close_price,
    )

    # Add OBV
    # indicators.add(
    #     "obv",
    #     calculate_obv,
    #     {},
    #     IndicatorSource.ohlcv,
    # )


    #Social Metrics
    # Add original value indicators
    social_metrics = [
        "social_mentions"
    ]

    for metric in social_metrics:
        print('social_metric going for', metric)
        indicators.add(
            metric,
            add_metric,
            {"metric_name": metric},  # Pass the metric name as a parameter
            IndicatorSource.external_per_pair,
        )

        # Add EMA indicators
        indicators.add(
            f"{metric}_emas",
            calculate_metric_emas,
            {
                "metric_name": metric,
                "short_length": parameters.social_ema_short_length,
                "long_length": parameters.social_ema_long_length
            },
            IndicatorSource.external_per_pair,
        )

    return indicators

# Trading algorithm

- Describe out trading strategy as code

In [10]:
from tradeexecutor.state.visualisation import PlotKind
from tradeexecutor.state.trade import TradeExecution
from tradeexecutor.strategy.pandas_trader.strategy_input import StrategyInput, StrategyInputIndicators
from tradeexecutor.strategy.weighting import weight_by_1_slash_n
from tradeexecutor.strategy.alpha_model import AlphaModel
from tradeexecutor.strategy.pandas_trader.strategy_input import IndicatorDataNotFoundWithinDataTolerance
from tradeexecutor.utils.crossover import contains_cross_over, contains_cross_under
from pandas_ta.overlap import ema
from decimal import Decimal
from tradeexecutor.state.trigger import TriggerType, TriggerCondition
from tradeexecutor.state.identifier import TradingPairIdentifier
import cachetools


from tradeexecutor.strategy.tvl_size_risk import USDTVLSizeRiskModel
import re

def update_tp_level(tp_levels_reached, position_id, tp_level, reached=True):
    """
    Update the TP level reached status for a given position in the provided dictionary.
    
    Parameters:
    - tp_levels_reached: Dictionary tracking TP levels for positions.
    - position_id: Unique identifier for the position.
    - tp_level: The TP level being updated (e.g., 'tp1', 'tp2').
    - reached: Boolean indicating whether the TP level has been reached.
    """
    if position_id not in tp_levels_reached:
        tp_levels_reached[position_id] = {}
    tp_levels_reached[position_id][tp_level] = reached

def is_tp_level_reached(tp_levels_reached, position_id, tp_level):
    """
    Check if a TP level has been reached for a given position in the provided dictionary.
    
    Parameters:
    - tp_levels_reached: Dictionary tracking TP levels for positions.
    - position_id: Unique identifier for the position.
    - tp_level: The TP level to check (e.g., 'tp1', 'tp2').
    
    Returns:
    - Boolean indicating whether the TP level has been reached.
    """
    return tp_levels_reached.get(position_id, {}).get(tp_level, False)


def is_volume_increasing(current_volume, volume_sma):
    """Check if the current volume is significantly higher than the SMA."""
    return current_volume > volume_sma


tp_levels_reached = {}

def generate_visualisations(timestamp, pair, social_data_indicators, visualisation, metrics):
    for metric in metrics:
        visualisation.plot_indicator(
            timestamp, 
            f"{metric} {pair.base}", 
            PlotKind.technical_indicator_detached, 
            social_data_indicators[f"{metric}_ema_short"], 
            pair=pair
            # detached_overlay_name=f"{metric} {pair.base}"
        )

        # Plot the long EMA overlay
        visualisation.plot_indicator(
            timestamp, 
            f"Long EMA {metric} {pair.base}", 
            PlotKind.technical_indicator_overlay_on_detached, 
            social_data_indicators[f"{metric}_ema_long"], 
            pair=pair, 
            detached_overlay_name=f"{metric} {pair.base}"
        )

def is_accetable(
    indicators: StrategyInputIndicators,
    parameters: StrategyParameters,
    pair: TradingPairIdentifier,
    momentum: float | None,
) -> bool:
    """Check the pair for risk acceptance

    :return:
        True if we should trade this pair
    """

    if pair in broken_trading_pairs:
        # Don't even bother to try trade this
        return False

    if pair.base.token_symbol in avoid_backtesting_tokens:
        # Manually blacklisted toen for this backtest
        return False

    # Pair does not quality yet due to low liquidity
    liquidity = indicators.get_tvl(pair=pair)
    if liquidity is None or liquidity <= parameters.min_liquidity_threshold:
        return False

    volume = indicators.get_price(pair, column="volume")
    close_price = indicators.get_price(pair=pair)
    volume_adjusted = volume / close_price

    if volume_adjusted < parameters.min_volume:
        return False

    return True


def calculate_market_confidence(
    macd: float,
    signal_line: float,
    rsi: float,
    momentum: float,
    obv_slope: float,
    parameters: StrategyParameters
) -> tuple[float, dict]:
    """Calculate market confidence score based on technical indicators.
    
    Returns:
        tuple[float, dict]: Confidence score (0-1) and detailed traits analysis
    """
    traits = {
        "macd_above_signal": {
            "condition": macd > signal_line,
            "weight": 0.25,
            "description": "MACD above signal line"
        },
        "rsi_momentum": {
            "condition": rsi > parameters.rsi_threshold,
            "weight": 0.25,
            "description": "RSI showing momentum"
        },
        "price_momentum": {
            "condition": momentum > parameters.minimum_mometum_threshold,
            "weight": 0.25,
            "description": "Strong price momentum"
        },
        "obv_uptrend": {
            "condition": obv_slope > 0,
            "weight": 0.25,
            "description": "Rising OBV"
        }
    }
    
    # Calculate confidence score
    confidence_score = sum(
        trait["weight"] for trait in traits.values() 
        if trait["condition"]
    )
    
    # Add the actual values to the traits dict for logging
    traits["macd_above_signal"]["value"] = float(macd - signal_line)
    traits["rsi_momentum"]["value"] = float(rsi)
    traits["price_momentum"]["value"] = float(momentum)
    traits["obv_uptrend"]["value"] = float(obv_slope)
    
    return confidence_score, traits


def safe_get_indicator(indicators, indicator_name, pair, column=None, default_value=None, tolerance_days=15):
    """Safely get an indicator value with error handling and tolerance.
    
    Args:
        indicators: StrategyInputIndicators instance
        indicator_name: Name of the indicator to fetch
        pair: Trading pair
        column: Specific column name for multi-column indicators
        default_value: Value to return if indicator fetch fails
        tolerance_days: Data delay tolerance in days
    
    Returns:
        Indicator value or default_value if not available
    """
    try:
        if column:
            return indicators.get_indicator_value(
                indicator_name, 
                pair=pair, 
                column=column,
                data_delay_tolerance=pd.Timedelta(days=tolerance_days)
            )
        else:
            return indicators.get_indicator_value(
                indicator_name, 
                pair=pair,
                data_delay_tolerance=pd.Timedelta(days=tolerance_days)
            )
    except (IndicatorDataNotFoundWithinDataTolerance, Exception) as e:
        print(f"Warning: Could not fetch indicator {indicator_name} for {pair}: {str(e)}")
        return default_value
    
def safe_get_indicator_series(indicators, indicator_name, pair, column=None, default_value=None, tolerance_days=15):
    """Safely get an indicator series with error handling.
    
    Args:
        indicators: StrategyInputIndicators instance
        indicator_name: Name of the indicator to fetch
        pair: Trading pair
        column: Specific column name for multi-column indicators
        default_value: Value to return if indicator fetch fails (typically empty series)
        tolerance_days: Data delay tolerance in days
    
    Returns:
        pd.Series: Indicator series or default_value (empty series) if not available
    """
    try:
        if column:
            return indicators.get_indicator_series(
                indicator_name, 
                pair=pair, 
                column=column,
                data_delay_tolerance=pd.Timedelta(days=tolerance_days)
            )
        else:
            return indicators.get_indicator_series(
                indicator_name, 
                pair=pair,
                data_delay_tolerance=pd.Timedelta(days=tolerance_days)
            )
    except Exception as e:
        print(f"Warning: Could not fetch indicator series {indicator_name} for {pair}: {str(e)}")
        return pd.Series() if default_value is None else default_value
    

def safe_get_price(indicators, pair, column="close", default_value=None, tolerance_days=7):
    """Safely get price data with error handling and tolerance.
    
    Args:
        indicators: StrategyInputIndicators instance
        pair: Trading pair
        column: Price column to fetch (close, open, high, low, volume)
        default_value: Value to return if price fetch fails
        tolerance_days: Data lag tolerance in days
    
    Returns:
        Price value or default_value if not available
    """
    try:
        return indicators.get_price(
            pair=pair,
            column=column,
            data_lag_tolerance=pd.Timedelta(days=tolerance_days)
        )
    except Exception as e:
        print(f"Warning: Could not fetch {column} price for {pair}: {str(e)}")
        return default_value

def safe_get_tvl(indicators, pair, default_value=None, tolerance_days=7):
    """Safely get TVL data with error handling and tolerance."""
    try:
        return indicators.get_tvl(
            pair=pair,
            data_lag_tolerance=pd.Timedelta(days=tolerance_days)
        )
    except Exception as e:
        print(f"Warning: Could not fetch TVL for {pair}: {str(e)}")
        return default_value
    
#Logging helpers
def strip_except_newlines(text):
    # Split by newlines, strip each line, then rejoin
    return '\n'.join(line.strip() for line in text.splitlines())

def dedent_any(text: str) -> str:
    """Dedent variable indents of the text"""
    return re.sub(r'^\s+', '', strip_except_newlines(text), flags=re.MULTILINE)

def is_acceptable(
    indicators: StrategyInputIndicators,
    parameters: StrategyParameters,
    pair: TradingPairIdentifier
) -> bool:
    """Check the pair for risk acceptance

    :return:
        True if we should trade this pair
    """

    broken_trading_pairs = get_broken_pairs(
        indicators.strategy_universe, parameters
    )

    if pair in broken_trading_pairs:
        # Don't even bother to try trade this
        return False

    avoid_backtesting_tokens = {
        # Trading jsut stops (though there is liq left)
        # https://tradingstrategy.ai/trading-view/ethereum/uniswap-v3/id-usdc-fee-30
        "PEOPLE",
        "WBTC",
    }

    if pair.base.token_symbol in avoid_backtesting_tokens:
        # Manually blacklisted toen for this backtest
        return False

    # Pair does not quality yet due to low liquidity
    liquidity = indicators.get_tvl(pair=pair)
    if liquidity is None or liquidity <= parameters.min_liquidity_threshold:
        return False

    volume = indicators.get_price(pair, column="volume")
    close_price = indicators.get_price(pair=pair)
    if (volume is not None) and (close_price is not None):
        volume_adjusted = abs(volume) / close_price
        if volume_adjusted < parameters.min_volume:
            return False
        return True
    
    return False

@cachetools.cached(
    cache=cachetools.TTLCache(ttl=60 * 60 * 2, maxsize=1000),
    key=lambda s, p: cachetools.keys.hashkey("get_broken_pairs"),
)
def get_broken_pairs(strategy_universe: TradingStrategyUniverse, parameters) -> set:
    # Run some extra sanity check for small cap tokens
    broken_trading_pairs = set()
    pairs_to_avoid = [87449]

    for pair in strategy_universe.iterate_pairs():
        reason = strategy_universe.get_trading_broken_reason(
            pair,
            min_candles_required=10,
            min_price=parameters.min_price,
            max_price=parameters.max_price,
        )
        if pair.internal_id in pairs_to_avoid:
            broken_trading_pairs.add(pair)
        if reason:
            print(
                f"FAIL: {pair} with base token {pair.base.address} may be problematic: {reason}"
            )
            broken_trading_pairs.add(pair)
        else:
            print(f"OK: {pair} included in the backtest")


    return broken_trading_pairs

def decide_trades(
    input: StrategyInput,
) -> list[TradeExecution]:
    #
    # Decision cycle setup.
    # Read all variables we are going to use for the decisions.
    #
    parameters = input.parameters
    position_manager = input.get_position_manager()
    state = input.state
    timestamp = input.timestamp
    indicators = input.indicators
    strategy_universe = input.strategy_universe

    #Initiate logging vars
    equity_before = state.portfolio.get_total_equity()
    trade_decision_report = {}
    num_signals_accepted = 0
    num_signals_rejected = 0
    signals_created = []
    signals_rejected = []
    rejected_trades_lack_of_liquidity = []

    cash = position_manager.get_current_cash()
    total_equity = state.portfolio.get_total_equity()
    if total_equity > 10_000_000:
        position_valuations = "\n".join(
            [
                f"{p} (token {p.pair.base.address}): {p.get_value()}"
                for p in state.portfolio.open_positions.values()
            ]
        )
        raise RuntimeError(
            f"Portfolio total equity exceeded 1,000,000 USD. Some broken math likely happened. Total equity is {total_equity} USD.\nOpen positions:\n{position_valuations}"
        )

    #
    # Trading logic
    #
    # We do some extra checks here as we are trading low quality
    # low cap tokens which often have outright malicious data for trading.
    #

    trades = []

    # Enable trailing stop loss after we reach the profit taking level
    #
    for position in state.portfolio.open_positions.values():
        if position.trailing_stop_loss_pct is None:
            close_price = indicators.get_price(pair=position.pair)
            if (
                close_price
                and close_price
                >= position.get_opening_price()
                * parameters.trailing_stop_loss_activation_level
            ):
                position.trailing_stop_loss_pct = parameters.trailing_stop_loss_pct
        elif position.stop_loss is None:
            position.stop_loss = parameters.stop_loss_pct

    size_risk_model = USDTVLSizeRiskModel(
        pricing_model=input.pricing_model,
        per_position_cap=parameters.per_position_cap_of_pool,  # This is how much % by all pool TVL we can allocate for a position
        missing_tvl_placeholder_usd=100_000,  # Placeholder for missing TVL data until we get the data off the chain
    )


    for pair in strategy_universe.iterate_pairs():

        if not is_acceptable(indicators, parameters, pair):
            # Skip this pair for the  risk management
            continue

        position_for_pair = state.portfolio.get_open_position_for_pair(pair)

        # Extract Social indicators here
        try:
            social_mentions = indicators.get_indicator_value(
                "social_mentions",
                pair=pair,
                index=-2,
                data_delay_tolerance=pd.Timedelta(days=15)
            )
            print(f"Social mentions for pair {pair.base.token_symbol}: {social_mentions}")
        except IndicatorDataNotFoundWithinDataTolerance:
            print(
                f"Social mentions data not found within tolerance for pair {pair}. "
                "Skipping this asset."
            )
            continue

        try:
            ema_short = indicators.get_indicator_value(
                "social_mentions_emas",
                pair=pair,
                column=f"social_mentions_ema_short",
                index=-2,
                data_delay_tolerance=pd.Timedelta(days=15)
            )
        except IndicatorDataNotFoundWithinDataTolerance:
            print(
                f"EMA short data not found within tolerance for pair {pair}. "
                "Skipping this asset."
            )
            continue

        try:
            ema_long = indicators.get_indicator_value(
                "social_mentions_emas",
                pair=pair,
                column=f"social_mentions_ema_long",
                index=-2,
                data_delay_tolerance=pd.Timedelta(days=15)
            )
        except IndicatorDataNotFoundWithinDataTolerance:
            print(
                f"EMA long data not found within tolerance for pair {pair}. "
                "Skipping this asset."
            )
            continue

        ema_short_series = indicators.get_indicator_series(
            "social_mentions_emas", pair=pair, column=f"social_mentions_ema_short"
        )
        ema_long_series = indicators.get_indicator_series(
            "social_mentions_emas", pair=pair, column=f"social_mentions_ema_long"
        )

        # Volume Based Metrics
        volume = indicators.get_price(column="volume", pair=pair)
        momentum = indicators.get_indicator_value("momentum", pair=pair)
        tvl = indicators.get_tvl(pair=pair)

        crossover_occurred = False
        try:
            crossover, crossover_index = contains_cross_over(
                ema_short_series,
                ema_long_series,
                lookback_period=parameters.cross_over_period,
                must_return_index=True,
            )
            crossover_occurred = crossover and (
                crossover_index >= -parameters.cross_over_period
            )
        except Exception as e:
            crossover = None
            crossover_occurred = False
            print("Cross over did not occur due to exception: %s", e)

        #
        # Visualisations
        #

        if input.is_visualisation_enabled():
            visualisation = (
                state.visualisation
            )  

            visualisation.plot_indicator(
                timestamp,
                f"Social mentions {pair.base}",
                PlotKind.technical_indicator_detached,
                social_mentions,
                pair=pair,
            )
            visualisation.plot_indicator(
                timestamp,
                f"Social mentions EMA {pair.base}",
                PlotKind.technical_indicator_detached,
                ema_short,
                pair=pair,
            )
            visualisation.plot_indicator(
                timestamp,
                f"Social mentions Long {pair.base}",
                PlotKind.technical_indicator_overlay_on_detached,
                ema_long,
                pair=pair,
                detached_overlay_name=f"Social mentions EMA {pair.base}",
            )
            visualisation.plot_indicator(
                timestamp,
                f"Momentum {pair.base}",
                PlotKind.technical_indicator_detached,
                momentum,
                pair=pair,
            )

            trade_decision_report[pair.base.token_symbol] = {
                "pair": pair.base.token_symbol,
                "momentum": momentum,
                "social_mentions": social_mentions,
                "crossover_occurred": crossover_occurred,
                "momentum_above_threshold": momentum >= parameters.minimum_mometum_threshold if momentum is not None else False,
                "social_mention_min_satisfied": ema_short >= parameters.social_ma_min if ema_short is not None else False,
                "all_conditions_met": (
                    crossover_occurred 
                    and momentum is not None
                    and momentum >= parameters.minimum_mometum_threshold
                    and ema_short is not None 
                    and ema_short >= parameters.social_ma_min
                )
            }

            if trade_decision_report[pair.base.token_symbol]["all_conditions_met"]:
                print(f"Accepted signal for {pair.base.token_symbol} current num_signals_accepted: {num_signals_accepted}")
                num_signals_accepted += 1
                signals_created.append(trade_decision_report[pair.base.token_symbol])
            else:
                print(f"Rejected signal for {pair.base.token_symbol} current num_signals_rejected: {num_signals_rejected}")
                num_signals_rejected += 1
                signals_rejected.append(trade_decision_report[pair.base.token_symbol])

        # Check if we are too early in the backtesting to have enough data to calculate indicators
        # if None in (volume, bb_upper_interactions_social, bb_upper_interactions_social, bb_upper_sentiment_social, social_mentions, interactions, sentiment, sma):
        if None in (volume, social_mentions, tvl): 
            continue

        # Make sure you don't trade the same base token in current traded positions
        open_positions = state.portfolio.open_positions.values()
        base_token_address = pair.base.address
        quote_token_address = pair.quote.address
        # Check if there's already an open position with the same quote token
        existing_position_with_quote = any(
            pos.pair.base.address == base_token_address for pos in open_positions
        )

        # If there's already an open position with the same quote token, skip this pair
        if existing_position_with_quote:
            continue


        if (
            len(state.portfolio.open_positions) < parameters.max_assets
            and state.portfolio.get_open_position_for_pair(pair) is None
        ):
            should_open_position = False

            if momentum is None:
                if crossover_occurred and ema_short >= parameters.social_ma_min:
                    should_open_position = True

                    # if (tvl * parameters.min_liquidity_trade_threshold) <= (
                    #     cash * parameters.allocation
                    # ):
                    #     buy_amount = tvl * parameters.min_liquidity_trade_threshold
                    # else:
                    #     buy_amount = cash * parameters.allocation

                    # print(
                    #     "Opening position for %s with %s USDC", pair, buy_amount
                    # )

                    # trades += position_manager.open_spot(
                    #     pair,
                    #     value=buy_amount,
                    #     stop_loss_pct=parameters.stop_loss_pct,
                    # )

            elif (
                crossover_occurred
                and (momentum >= parameters.minimum_mometum_threshold)
                and ema_short >= parameters.social_ma_min
            ):
                should_open_position = True

            if should_open_position:
                size_risk = size_risk_model.get_acceptable_size_for_position(
                    timestamp=timestamp,
                    pair=pair,
                    asked_value=cash * parameters.allocation,
                )
                buy_amount = size_risk.accepted_size

                print(
                    "Position size risk, pair: %s, asked: %s, accepted: %s, diagnostics: %s",
                    pair,
                    size_risk.asked_size,
                    buy_amount,
                    size_risk.diagnostics_data,
                )

                if buy_amount >= parameters.min_trade_size_usd:
                    print(
                        "Opening position for %s with %f USDC", pair, buy_amount
                    )
                    trades += position_manager.open_spot(
                        pair,
                        value=buy_amount,
                        stop_loss_pct=parameters.stop_loss_pct,
                    )
                else:
                    rejected_trades_lack_of_liquidity.append(
                        f"Skipped {trade_decision_report[pair.base.token_symbol]} due to trade size {buy_amount} USD, because it is below our minimum threshold {parameters.min_trade_size_usd} USD"
                    )
                    print(
                        "Skipping trade size %f USD, because it is below our minimum threshold %f USD",
                        buy_amount,
                        parameters.min_trade_size_usd,
                    )
            
    
    equity_after = state.portfolio.get_total_equity()

    report = dedent_any(f"""
        Trades decided: {len(trades)}
        Pairs total: {strategy_universe.data_universe.pairs.get_count()}
        Num Signals accepted: {num_signals_accepted}
        Num Signals rejected: {num_signals_rejected}
        Rejected trades due to lack of liquidity: {rejected_trades_lack_of_liquidity}
        Total equity before: {equity_before:,.2f} USD
        Total equity after: {equity_after:,.2f} USD
        Cash: {position_manager.get_current_cash():,.2f} USD
        Positions: {state.portfolio.open_positions.values()}
        Allocated to signals: {equity_after - equity_before:,.2f} USD
        Signals created: {signals_created}
        Signals rejected: {signals_rejected}
    """)

    state.visualisation.add_message(timestamp, report)

    return trades

# Backtest

- Run the backtest

In [None]:
import logging
from tradeexecutor.backtest.backtest_runner import run_backtest_inline
import logging
logging.getLogger().setLevel(logging.INFO)

try:
    result = run_backtest_inline(
        name=parameters.id,
        engine_version="0.5",
        decide_trades=decide_trades,
        create_indicators=create_indicators,
        client=client,
        universe=strategy_universe,
        parameters=parameters,
        strategy_logging=False,
        max_workers=1,
        reserve_currency="usdc", 
        start_at=parameters.backtest_start,
        end_at=parameters.backtest_end,
        # We need to set this really high value, because
        # some low cap tokens may only see 1-2 trades per year
        # and our backtesting framework aborts if it thinks
        # there is an issue with data quality
        data_delay_tolerance=pd.Timedelta(days=450),
        minimum_data_lookback_range=pd.Timedelta(days=5),
        
        # Uncomment to enable verbose logging
        log_level=logging.INFO,
    )
    state = result.state
    trade_count = len(list(state.portfolio.get_all_trades()))
    print(f"Backtesting completed, backtested strategy made {trade_count} trades")
    print(state.portfolio.get_all_trades())
except Exception as e:
    print("error", e)
    print(e.__cause__)
    raise e




# Equity curve

- Equity curve shows how your strategy accrues value over time
- A good equity curve has a stable ascending angle
- Benchmark against MATIC buy and hold

In [None]:
import pandas as pd
from tradeexecutor.analysis.multi_asset_benchmark import get_benchmark_data
from tradeexecutor.visual.benchmark import visualise_equity_curve_benchmark

# Pulls WMATIC/USDC as the benchmark
benchmark_indexes = get_benchmark_data(
    strategy_universe,
    cumulative_with_initial_cash=state.portfolio.get_initial_cash()
)

fig = visualise_equity_curve_benchmark(
    name=state.name,
    portfolio_statistics=state.stats.portfolio,
    all_cash=state.portfolio.get_initial_cash(),
    benchmark_indexes=benchmark_indexes,
    height=800,
    log_y=True,
)

fig.show()

### Technical indicator and trade visualisation
Draw the technical indicators we filled in in decide_trades()
Show the made trades on the price chart for a single trading pair
You need to zoom in to see the bollinger bands, as the default chart width is full multi-year study. However the default notebook chart mode is static images, as interactive images are a bit slow on Github Codespaces.

In [None]:

from tradeexecutor.visual.single_pair import visualise_single_pair
# from tradeexecutor.visual.
from tradingstrategy.charting.candle_chart import VolumeBarMode

pairs_dict = {}
sample_pair = None

for pair in strategy_universe.iterate_pairs():
    print(f"Pair : {pair.internal_id} with the following description {pair.get_human_description()} and {pair.get_human_description()}")
    pairs_dict[pair.get_identifier] = pair
    if pair.base.token_symbol == 'BRETT':
        sample_pair = strategy_universe.get_trading_pair(pair.internal_id)
        
start_at, end_at = state.get_strategy_start_and_end()   # Limit chart to our backtesting range
# btc_usdt = strategy_universe.get_pair_by_human_description(trading_pairs[0])
# 68900 PEOPLE
# 3228746 PEPE

figure = visualise_single_pair(
    state,
    pair_id=sample_pair.internal_id,
    execution_context=notebook_execution_context,
    candle_universe=strategy_universe.data_universe.candles,
    start_at=pd.Timestamp('2023-01-01'), #start_at,
    end_at=pd.Timestamp('2024-12-01'),#end_at, #pd.Timestamp('2023-10-01'),
    volume_bar_mode=VolumeBarMode.hidden,
    volume_axis_name="Volume (USD)",
    height = 800,
    title=f"{sample_pair.base} trades",
    detached_indicators=True
)

# write_pickle_to_gcs(figure, "test/eth_memecoin_fig_single_pair_latest.pkl")


figure.show()

# Performance metrics

- Display portfolio performance metrics
- Compare against buy and hold matic using the same initial capital

In [None]:
from tradeexecutor.analysis.multi_asset_benchmark import compare_strategy_backtest_to_multiple_assets

compare_strategy_backtest_to_multiple_assets(
    state,
    strategy_universe,
    display=True,
)

# Trading statistics

- Display summare about made trades

In [None]:
from tradeexecutor.analysis.trade_analyser import build_trade_analysis

analysis = build_trade_analysis(state.portfolio)
summary = analysis.calculate_summary_statistics()
display(summary.to_dataframe())

# Pair breakdown

- Profit for each trading pair

## GridSearch

In [None]:
from tradeexecutor.backtest.optimiser import perform_optimisation
from tradeexecutor.backtest.optimiser import prepare_optimiser_parameters
from tradeexecutor.backtest.optimiser import MinTradeCountFilter
from tradeexecutor.backtest.optimiser_functions import BalancedSharpeAndMaxDrawdownOptimisationFunction
from tradeexecutor.backtest.optimiser_functions import optimise_profit, optimise_sharpe


# How many Gaussian Process iterations we do
iterations = 4

# What do we optimise for
# search_func = BalancedSharpeAndMaxDrawdownOptimisationFunction(sharpe_weight=0.7, max_drawdown_weight=0.3)
search_func = optimise_profit

optimiser_result = perform_optimisation(
    iterations=iterations,
    search_func=search_func,
    decide_trades=decide_trades,
    strategy_universe=strategy_universe,
    parameters=prepare_optimiser_parameters(Parameters),  # Handle scikit-optimise search space
    create_indicators=create_indicators,
    result_filter=MinTradeCountFilter(50),
    timeout=20*60,    
    # Uncomment for diagnostics
    # log_level=logging.INFO,
    max_workers=1,
)

print(f"Optimise completed, optimiser searched {optimiser_result.get_combination_count()} combinations, with {optimiser_result.get_cached_count()} results read directly from cache")

In [None]:
from tradeexecutor.visual.grid_search import visualise_single_grid_search_result_benchmark

# GridSearchResult instance that gave the best performance
best_pick = optimiser_result.results[0].result

print(f"The best result found for {search_func} was {best_pick}")

fig = visualise_single_grid_search_result_benchmark(
    best_pick, 
    strategy_universe, 
    initial_cash=Parameters.initial_cash,
    log_y=False,
)
fig.show()

In [107]:
import pandas as pd
from decimal import Decimal
all_positions = result.state.portfolio.get_all_positions()

def calculate_profits(positions):
    results = []
    
    for pos in positions:
        buy_trade = None
        position_id = pos.position_id
        for trade_id, trade in pos.trades.items():
            if trade.is_buy():
                buy_trade = trade
            elif trade.is_sell():
                if buy_trade is None:
                    continue  # Skip if we haven't found a buy trade yet
                
                buy_price = float(buy_trade.planned_price)
                buy_quantity = float(buy_trade.planned_quantity)
                sell_price = float(trade.planned_price)
                sell_quantity = abs(float(trade.planned_quantity))  # Use abs() as sell quantity is negative
                executed_at = trade.executed_at
                # executed_quantity = trade.executed_quantity
                # executed_reserve = trade.executed_reserve
                unrealized_profit = pos.get_unrealised_and_realised_profit_percent()
                
                profit = (sell_price - buy_price) * sell_quantity
                profit_percentage = (sell_price / buy_price - 1) * 100
                
                # Determine the type of sell
                sell_type = "Unknown"
                if trade.is_partial_take_profit:
                    sell_type = "Partial Take Profit"
                elif trade.is_stoploss():
                    sell_type = "Stop Loss"
                elif trade.planned_price:
                    sell_type = "Take Profit (Planned Price)"

                trade
                
                results.append({
                    'Position ID': position_id,
                    'Trade ID': trade_id,
                    'Buy Price': buy_price,
                    'Sell Price': sell_price,
                    'Sell Quantity': sell_quantity,
                    'Profit': profit,
                    'Profit Percentage': profit_percentage,
                    'Executed At': executed_at,
                    # 'Executed Quantity': float(executed_quantity),
                    # 'Executed Reserve': float(executed_reserve),
                    'Unrealized Profit %': float(unrealized_profit),
                    'Sell Type': sell_type,
                    'Trade Status': trade.get_status(),
                    'Base Token': pos.pair.base.token_symbol
                })
    
    return pd.DataFrame(results)

# Assuming 'all_positions' is your iterable of positions
df_profits = calculate_profits(all_positions)


- Visualise some custom indicator data, so we know it looks correct
- We pick one of tokens in the sample data set, and pull outs it custom indicator data generated or loaded during the backtest run

Show raw custom indicator data.

In [None]:
import pandas as pd
import plotly.graph_objects as go

# Example to demonstrate using a figure (This should be replaced with your actual fig object)
# fig = go.Figure() # You would actually use your existing 'fig' from your code

# Function to extract data from Plotly figure
def extract_data_from_plotly_fig(fig):
    data = {}
    dates = None

    for trace in fig.data:
        if 'x' in trace and 'y' in trace:
            if dates is None:
                dates = trace.x  # Assuming all traces share the same 'x' dates
            data[trace.name] = trace.y

    # Creating DataFrame
    df = pd.DataFrame(data, index=dates)
    df.index.name = 'Date'
    return df

# Call the function with your figure
df = extract_data_from_plotly_fig(fig)

# Saving the DataFrame to CSV
csv_path = "gs://taraxa-research/test/extracted_trendspotting_backtest_plot_data.csv"
df.to_csv(csv_path)

# Output to check
print(df.head())