In [None]:
from fastlane_bot.tests.deterministic.base_imports import *
from fastlane_bot.data.abi import CARBON_CONTROLLER_ABI, UNISWAP_V3_POOL_ABI, UNISWAP_V2_POOL_ABI, PANCAKESWAP_V3_POOL_ABI, PANCAKESWAP_V2_POOL_ABI
import time

# Import the test_matrix

In [None]:
test_matrix = pd.read_csv(r"fastlane_bot\tests\deterministic\test_matrix.csv")
test_matrix

In [None]:
for test_run_name in test_matrix['Test Batch'].unique():
    blockchain = test_matrix[test_matrix['Test Batch']==test_run_name]['Blockchain'].values[0]
    click_options = test_matrix[test_matrix['Test Batch']==test_run_name]['click_options_input'].values[0]
    new_fork = bool(test_matrix[test_matrix['Test Batch']==test_run_name]['new_fork'].values[0])
    static_pool_data_filename= [x for x in click_options.split(" --") if "static_pool_data" in x][0].replace("static_pool_data_filename=","")
    arb_mode = [x for x in click_options.split(" --") if "arb_mode" in x][0].replace("arb_mode=","")
    special_action = ast.literal_eval(test_matrix[test_matrix['Test Batch']==test_run_name]['special_action'].values[0])
    print(test_run_name, blockchain, arb_mode, static_pool_data_filename, new_fork)
    print(click_options)

# Select a new fork or existing

In [None]:
## to add a click option for this
if new_fork:
    url = create_new_testnet(blockchain=blockchain)
    # rewrite the click_options with the new rpc_url generated
    click_options = ' --'.join([x for x in click_options.split(" --") if "rpc_url" not in x] + [f"rpc_url={url}"])
else:
    url= [x for x in click_options.split(" --") if "rpc_url" in x][0].replace("rpc_url=","")
url

In [None]:
# Connect to the RPC:
w3 = Web3(Web3.HTTPProvider(url))
assert w3.is_connected() == True, "Web3 not connected"

In [None]:
# Initialize the Carbon Controller contract
CarbonController_address = pd.read_csv(r"fastlane_bot\data\multichain_addresses.csv").query("exchange_name=='carbon_v1'").query(f"chain=='{blockchain}'").factory_address.values[0]
fromBlock = int(pd.read_csv(r"fastlane_bot\data\multichain_addresses.csv").query("exchange_name=='carbon_v1'").query(f"chain=='{blockchain}'").start_block.values[0])
CarbonController = w3.eth.contract(address=CarbonController_address, abi=CARBON_CONTROLLER_ABI)

# Get Carbon Strategies and Delete

In [None]:
if arb_mode == 'multi_pairwise_pol':
    pass
else:
    # Get the current state
    strategy_created_df, strategy_deleted_df, remaining_carbon_strategies = get_state_of_carbon_strategies(w3, CarbonController, fromBlock=fromBlock)

    # takes about 4 minutes per 100 strategies
    # so 450 ~ 18 minutes
    undeleted_strategies = delete_all_carbon_strategies(w3, CarbonController, blockchain, carbon_strategy_id_owner_list=remaining_carbon_strategies)

    # These strategies cannot be deleted on Ethereum
    known_unable_to_delete = {
        68737038118029569619601670701217178714718: ("pDFS", "ETH"), #pDFS 
        }
    assert all([x in known_unable_to_delete for x in undeleted_strategies]), f"Strategies not deleted that are unknown: {undeleted_strategies}"

    # Redundant for checking state
    strategy_created_df, strategy_deleted_df, remaining_carbon_strategies = get_state_of_carbon_strategies(w3, CarbonController, fromBlock=fromBlock)

# The above resets the testnet

# Set the Test State on external pools
Since this method uses backdate_pool=True to fetch the current state of the external pools, then ALL the external pools included in a single test bot run must have their states updated prior to initializing the bot

In [None]:
# Import pool data
static_pool_data_testing_path = os.path.normpath(rf"fastlane_bot\data\blockchain_data\{blockchain}\{static_pool_data_filename}.csv")
test_pools = pd.read_csv(static_pool_data_testing_path, dtype=str)

# Overwrite the last_updated_block field with a recent block due to bot events fetching bug
testing_start_block = int(w3.eth.block_number)
test_pools['last_updated_block'] = testing_start_block
test_pools.to_csv(static_pool_data_testing_path)

# Handle each exchange_type differently for the required updates
if 'test' in test_pools.columns:
    for index in test_pools.index:
        exchange_type = test_pools["exchange_type"][index]
        print(test_pools["exchange_type"][index], test_pools["exchange"][index])
        handle_exchange_parameters(w3, test_pools.iloc[index])
        
        # Optional validation to ensure that all states are set correctly
        # Calls all relevant functions and verifies slot content againt input data
        run_slot_update_tests(w3, test_pools)

# Start a bot instance to run all tests against
- Limitations are one bot per mode per test

In [None]:
# Start the bot
child_process = initialize_bot_new(click_options)

# Wait until the pool data populates
most_recent_log_folder = await_first_iteration()

In [None]:
if special_action:
    for key,value in special_action.items():
        # Check if the key is in the function map
        if key in function_map:
            # Call the corresponding function with the value
            function_map[key](w3, value)
        else:
            raise ValueError(f"key {key} not in function_map")

In [None]:
# Impore test strategies
test_strategies_path = os.path.normpath(rf"fastlane_bot\data\blockchain_data\{blockchain}\test_strategies_{test_run_name}.json")
with open(test_strategies_path) as file:
    test_strategies = json.load(file)['test_strategies']
    print(f"{len(test_strategies.keys())} test strategies imported")

# Mark the block that new strats were created
strats_created_fromBlock = w3.eth.get_block_number()
print("strats_created_fromBlock", strats_created_fromBlock)

# populate a dictionary with all the relevant test strategies
test_strategy_txhashs = {}
for i in range(1,len(test_strategies.keys())+1):
    i = str(i)
    test_strategy = test_strategies[i]
    get_token_approval(w3, test_strategy['token0'], CarbonController.address, test_strategy['wallet'])
    get_token_approval(w3, test_strategy['token1'], CarbonController.address, test_strategy['wallet'])
    txhash = createStrategy_fromTestDict(w3, CarbonController, test_strategy)
    test_strategy_txhashs[i] = {}
    test_strategy_txhashs[i]['txhash'] = txhash

In [None]:
if len(test_strategy_txhashs)>0:
    # collect the new state of relevant strategies created
    strategy_created_df, strategy_deleted_df, remaining_carbon_strategies = get_state_of_carbon_strategies(w3, CarbonController, fromBlock=strats_created_fromBlock)
    new_strats_created = strategy_created_df["id"].to_list()
    print(f"There have been {len(new_strats_created)} new strategies created")

    # add the strategy ids
    for i in test_strategy_txhashs.keys():
        test_strategy_txhashs[i]['strategyid'] = strategy_created_df.query(f"transactionHash == '{test_strategy_txhashs[i]['txhash']}'").id.values[0]

    # Wait for the arbs to execute
    #this could be replaced with a loop searching for a successful tx
    print("Sleeping while arbs are executed...")
    if blockchain == 'ethereum':
        time.sleep(int(35*len(test_strategy_txhashs.keys()) + 15)) 
    if blockchain == 'coinbase_base':
        time.sleep(int(5*len(test_strategy_txhashs.keys()) + 15))     

    # First layer verification
    # Check that all strategies recently created have been traded on
    updated_events_df = get_GenericEvents(w3, CarbonController, "StrategyUpdated", fromBlock=strats_created_fromBlock)
    if len(updated_events_df) == 0:
        print("Not all strategies created have been traded against")
    else:
        strategies_traded_against =  list(set(updated_events_df[updated_events_df.reason==1].id.to_list()))
    if not all(item in strategies_traded_against for item in new_strats_created):
        print("Not all strategies created have been traded against")

In [None]:
# get all successful_txs
all_successful_txs = glob.glob(os.path.join(most_recent_log_folder, "*.txt"))

# Read the succussful_txs in as strings
txt_all_successful_txs = []
for successful_tx in all_successful_txs:
    with open(successful_tx, 'r') as file:
        j = file.read()
        txt_all_successful_txs += [(j)]
        file.close()

# Successful transactions on Tenderly are marked by status=1
actually_txt_all_successful_txs = [tx for tx in txt_all_successful_txs if "'status': 1" in tx]

In [None]:
# Import expected test results
test_results_path = os.path.normpath(rf"fastlane_bot\data\blockchain_data\{blockchain}\test_results_{test_run_name}.json")
with open(test_results_path) as f:
    test_datas = json.load(f)['test_data']
    f.close()
    print(f"{len(test_datas.keys())} test results imported")

if arb_mode == 'multi_pairwise_pol':
    tests_passed = any(['bancor_pol' in x for x in actually_txt_all_successful_txs ])
else:
    # Loop over the created test strategies and verify test data
    tests_passed=True
    for i in test_strategy_txhashs.keys():
        search_id = test_strategy_txhashs[i]['strategyid']
        print(f"Evaluating test {i}, {search_id}")
        tx_data = get_tx_data(search_id, actually_txt_all_successful_txs)
        clean_tx_data(tx_data)
        test_data = test_datas[i]
        if tx_data == test_data:
            print(f"Test {i} PASSED")
        else:
            print(f"Test {i} FAILED")
            tests_passed = False
if tests_passed:
    print("ALL TESTS PASSED")

In [None]:
# Terminate the bot background process
if child_process.terminate():
    print("Arb Bot shutdown complete")