In [7]:
from datamodel import (
    Listing,
    Observation,
    Order,
    OrderDepth,
    ProsperityEncoder,
    UserId,
    Symbol,
    Trade,
    TradingState,
)
from typing import List
import copy
import numpy as np
import math
from itertools import permutations
import Round_1

###############
import sys
import os

sys.path.append(os.path.abspath("../"))

import webbrowser
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from importlib import reload
from collections import defaultdict
from functools import partial, reduce
from http.server import HTTPServer, SimpleHTTPRequestHandler

from Backtest.data import has_day_data, read_day_data
from Backtest.file_reader import FileReader, FileSystemReader, PackageResourcesReader
from Backtest.models import BacktestResult
from Backtest.runner import run_backtest
################


import json
from typing import Any

In [8]:
class tradable_product:
    RAINFOREST_RESIN = "RAINFOREST_RESIN"
    KELP = "KELP"
    SQUID_INK = "SQUID_INK"


class position_limit:
    KELP = 50
    RAINFOREST_RESIN = 50
    SQUID_INK = 50

In [9]:
def merge_results(
    a: BacktestResult,
    b: BacktestResult,
    merge_profit_loss: bool,
    merge_timestamps: bool,
) -> BacktestResult:
    sandbox_logs = a.sandbox_logs[:]
    activity_logs = a.activity_logs[:]
    trades = a.trades[:]

    if merge_timestamps:
        a_last_timestamp = a.activity_logs[-1].timestamp
        timestamp_offset = a_last_timestamp + 100
    else:
        timestamp_offset = 0

    sandbox_logs.extend([row.with_offset(timestamp_offset) for row in b.sandbox_logs])
    trades.extend([row.with_offset(timestamp_offset) for row in b.trades])

    if merge_profit_loss:
        profit_loss_offsets = defaultdict(float)
        for row in reversed(a.activity_logs):
            if row.timestamp != a_last_timestamp:
                break

            profit_loss_offsets[row.columns[2]] = row.columns[-1]

        activity_logs.extend(
            [
                row.with_offset(timestamp_offset, profit_loss_offsets[row.columns[2]])
                for row in b.activity_logs
            ]
        )
    else:
        activity_logs.extend(
            [row.with_offset(timestamp_offset, 0) for row in b.activity_logs]
        )

    return BacktestResult(a.round_num, a.day_num, sandbox_logs, activity_logs, trades)


def write_output(output_file: Path, merged_results: BacktestResult) -> None:
    output_file.parent.mkdir(parents=True, exist_ok=True)
    with output_file.open("w+", encoding="utf-8") as file:
        file.write("Sandbox logs:\n")
        for row in merged_results.sandbox_logs:
            file.write(str(row))

        file.write("\n\n\nActivities log:\n")
        file.write(
            "day;timestamp;product;bid_price_1;bid_volume_1;bid_price_2;bid_volume_2;bid_price_3;bid_volume_3;ask_price_1;ask_volume_1;ask_price_2;ask_volume_2;ask_price_3;ask_volume_3;mid_price;profit_and_loss\n"
        )
        file.write("\n".join(map(str, merged_results.activity_logs)))

        file.write("\n\n\n\n\nTrade History:\n")
        file.write("[\n")
        file.write(",\n".join(map(str, merged_results.trades)))
        file.write("]")


class HTTPRequestHandler(SimpleHTTPRequestHandler):
    def end_headers(self) -> None:
        self.send_header("Access-Control-Allow-Origin", "*")
        return super().end_headers()

    def log_message(self, format: str, *args: Any) -> None:
        return


def open_visualizer(output_file: Path, no_requests: int) -> None:
    http_handler = partial(HTTPRequestHandler, directory=output_file.parent)
    http_server = HTTPServer(("localhost", 0), http_handler)

    webbrowser.open(
        f"https://jmerle.github.io/imc-prosperity-3-visualizer/?open=http://localhost:{http_server.server_port}/{output_file.name}"
    )

    # Chrome makes 2 requests: 1 OPTIONS request to check for CORS headers and 1 GET request to get the data
    # Some users reported their browser only makes 1 request, which is covered by the --vis-requests option
    for _ in range(no_requests):
        http_server.handle_request()


def format_path(path: Path) -> str:
    cwd = Path.cwd()
    if path.is_relative_to(cwd):
        return str(path.relative_to(cwd))
    else:
        return str(path)


def parse_data(data_root: Optional[str]) -> FileReader:
    if data_root is not None:
        return FileSystemReader(Path(data_root).expanduser().resolve())
    else:
        return PackageResourcesReader()


def parse_days(file_reader: FileReader, days: list[str]) -> list[tuple[int, int]]:
    parsed_days = []

    for arg in days:
        if "-" in arg:
            round_num, day_num = map(int, arg.split("-", 1))

            if not has_day_data(file_reader, round_num, day_num):
                # print(f"Warning: no data found for round {round_num} day {day_num}")
                continue

            parsed_days.append((round_num, day_num))
        else:
            round_num = int(arg)

            parsed_days_in_round = []
            for day_num in range(-5, 6):
                if has_day_data(file_reader, round_num, day_num):
                    parsed_days_in_round.append((round_num, day_num))

            if len(parsed_days_in_round) == 0:
                # print(f"Warning: no data found for round {round_num}")
                continue

            parsed_days.extend(parsed_days_in_round)

    if len(parsed_days) == 0:
        # print("Error: did not find data for any requested round/day")
        sys.exit(1)

    return parsed_days


def parse_out(out: Optional[str], no_out: bool) -> Optional[Path]:
    if out is not None:
        return Path(out).expanduser().resolve()

    if no_out:
        return None

    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    return Path.cwd() / "backtests" / f"{timestamp}.log"


def print_day_summary(result: BacktestResult) -> None:
    last_timestamp = result.activity_logs[-1].timestamp

    product_lines = []
    total_profit = 0

    for row in reversed(result.activity_logs):
        if row.timestamp != last_timestamp:
            break

        product = row.columns[2]
        profit = row.columns[-1]

        product_lines.append(f"{product}: {profit:,.0f}")
        total_profit += profit

    print(*reversed(product_lines), sep="\n")
    print(f"Total profit: {total_profit:,.0f}")


def print_overall_summary(results: list[BacktestResult]) -> None:
    # print("Profit summary:")

    total_profit = 0
    for result in results:
        last_timestamp = result.activity_logs[-1].timestamp

        profit = 0
        for row in reversed(result.activity_logs):
            if row.timestamp != last_timestamp:
                break

            profit += row.columns[-1]

        # print(f"Round {result.round_num} day {result.day_num}: {profit:,.0f}")
        total_profit += profit

    # print(f"Total profit: {total_profit:,.0f}")
    return total_profit


def main(params: dict, output_file=None) -> None:
    file_reader = parse_data(
        "C:/Users/edmun/OneDrive/Desktop/2025-IMC-Global-Trading-Challenge/Backtest/resources"
    )
    days = parse_days(file_reader, [str(day) for day in range(0, 5)])
    results = []
    for round_num, day_num in days:
        reload(Round_1)
        data = read_day_data(file_reader, round_num, day_num, no_names=True)
        result = run_backtest(
            trader=Round_1.Trader(params),
            data=data,
            # file_reader=file_reader,
            # round_num=round_num,
            # day_num=day_num,
            print_output=False,
            disable_trades_matching=False,
            # no_names=True,  # args.no_names,
            show_progress_bar=False,
        )

        # print_day_summary(result)
        if len(days) > 1:
            print()

        results.append(result)

    if output_file is not None:
        merged_results = reduce(
            lambda a, b: merge_results(
                a, b, merge_profit_loss=True, merge_timestamps=True
            ),
            results,
        )
        write_output(output_file, merged_results)
        print(f"\nSuccessfully saved backtest results to {format_path(output_file)}")
        open_visualizer(output_file, 2)

    final_profit = print_overall_summary(results)
    return final_profit

In [None]:
params = {
    tradable_product.RAINFOREST_RESIN: {
        "ask_slip": 1,
        "bid_slip": -1,
        "decay": 0.1,
        "threshold": 0.0,
    },
    tradable_product.KELP: {
        "mu": 0,
        "sigma2": 0.43145155,
        "MA1_coeff": -0.63535844,
        "most_mid_coeff": 0.55108093,
        "most_micro_coeff": -0.13851862,
        "decay": 0.6,
        "threshold": 0.8,
    },
    tradable_product.SQUID_INK: {
        "alpha": np.array([[-2.90656873e-04], [2.90141692e-05]]),
        "beta": np.array([[1.0, -0.97599632]]),
        "gamma": np.array([[-0.07401955, -0.05825519], [-0.00298338, -0.18217324]]),
    },
}

result = main(
    params=params,
    output_file=Path(
        "C:/Users/edmun/OneDrive/Desktop/2025-IMC-Global-Trading-Challenge/Round 1/test.log"
    ),
)






Successfully saved backtest results to test.log


In [None]:
param_range = np.arange(-1, 1, 0.25)
all_combi = list(permutations(param_range, 2))

best = 0
perm = {"PNL": [], "ask_slip": [], "bid_slip": [], "decay": [], "threshold": []}

print(f"Permuations Count: {len(all_combi)}")

for ask_slip, bid_slip in all_combi:
    # for window in np.arange(0.1, 1, 0.1):
    # for threshold in np.arange(0., 1.1, 0.1):
    # bid_slip = 0
    # ask_slip = 0
    window = 0.3
    threshold = 0
    params = {
        tradable_product.RAINFOREST_RESIN: {
            "ask_slip": 1,
            "bid_slip": -1,
            "decay": 0.1,
            "threshold": 0.0,
        },
        tradable_product.KELP: {
            "alpha": np.array([[-2.90656873e-04], [2.90141692e-05]]),
            "beta": np.array([[1.0, -0.97599632]]),
            "gamma": np.array([[-0.07401955, -0.05825519], [-0.00298338, -0.18217324]]),
            "decay": 0.6,
            "threshold": 0.8,
        },
        tradable_product.SQUID_INK: {
            "ask_slip": ask_slip,
            "bid_slip": bid_slip,
            "alpha": np.array([[-2.90656873e-04], [2.90141692e-05]]),
            "beta": np.array([[1.0, -0.97599632]]),
            "gamma": np.array([[-0.07401955, -0.05825519], [-0.00298338, -0.18217324]]),
        },
    }

    result = main(params=params)

    perm["PNL"].append(result)
    perm["ask_slip"].append(ask_slip)
    perm["bid_slip"].append(bid_slip)
    perm["decay"].append(window)
    perm["threshold"].append(threshold)

    if result > best:
        best = result
        print(best, ask_slip, bid_slip, window, threshold)

print(best)


Permuations Count: 56
































































































































































































































0


In [25]:
import pandas as pd

pd.DataFrame.from_dict(perm).sort_values(by="PNL", ascending=False)

Unnamed: 0,PNL,ask_slip,bid_slip,decay,threshold
13,10640.0,-0.5,-1.0,0.3,0
1,10491.0,-1.5,-0.5,0.3,0
7,10389.0,-1.0,-0.5,0.3,0
0,10141.0,-1.5,-1.0,0.3,0
8,10130.0,-1.0,0.0,0.3,0
2,10068.0,-1.5,0.0,0.3,0
6,9027.0,-1.0,-1.5,0.3,0
19,8978.0,0.0,-1.0,0.3,0
3,8428.0,-1.5,0.5,0.3,0
14,8420.0,-0.5,0.0,0.3,0


In [None]:
def ladder(qty, n, decay):
    raw_values = [math.exp(-decay * i) for i in range(1, n + 1)]
    sum_raw = sum(raw_values)
    ladder = [math.floor(qty * value / sum_raw) for value in raw_values]
    return ladder


ladder(50, 4, 0.6)

[24, 13, 7, 4]