In [None]:
import os
# Keep using keras-2 (tf-keras) rather than keras-3 (keras).
os.environ['TF_USE_LEGACY_KERAS'] = '1'

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf

from tf_agents.agents import lin_ucb_agent
from tf_agents.bandits.environments import bandit_py_environment
from tf_agents.bandits.metrics import tf_metrics as tf_bandit_metrics
from tf_agents.drivers import dynamic_step_driver
from tf_agents.environments import tf_py_environment
from tf_agents.specs import array_spec
from tf_agents.trajectories import time_step as ts

In [None]:
# --- 1. Data Simulation (Replace with your real data) ---
# For this example, we simulate data. In a real scenario, you would load
# your OHLCV data here.
def generate_dummy_data(num_steps, num_cryptos):
    """Generates a DataFrame with dummy crypto prices."""
    data = {}
    # Use a random walk for slightly more realistic price movement
    initial_prices = np.array([40000, 2000, 1.5])
    for i, name in enumerate(['BTC', 'ETH', 'ADA']):
        prices = np.zeros(num_steps)
        prices[0] = initial_prices[i]
        for t in range(1, num_steps):
            prices[t] = prices[t-1] * (1 + np.random.normal(0.0, 0.02)) # 2% std dev
        data[f'{name}_close'] = prices
    return pd.DataFrame(data)

NUM_CRYPTOS = 3
CRYPTO_NAMES = ['BTC', 'ETH', 'ADA']
NUM_STEPS = 2000
data = generate_dummy_data(NUM_STEPS, NUM_CRYPTOS)

data.head(5)

In [None]:

# --- 2. Create the Bandit Environment ---
class CryptoTradingEnvironment(bandit_py_environment.BanditPyEnvironment):
    def __init__(self, data, lookback_window=10):
        self._data = data
        self._lookback = lookback_window
        self._num_cryptos = len(CRYPTO_NAMES)
        self._num_actions = self._num_cryptos * 2  # Buy/Sell for each crypto
        self._current_step = self._lookback  # Start after the first lookback period
        
        # Define observation and action specs
        # Observation: past `lookback_window` returns for each crypto
        observation_spec = array_spec.ArraySpec(
            shape=(self._num_cryptos * self._lookback,),
            dtype=np.float32,
            name='context'
        )
        # Action: Buy/Sell for each crypto
        action_spec = array_spec.BoundedArraySpec(
            shape=(), dtype=np.int32, minimum=0, maximum=self._num_actions - 1, name='action'
        )
        
        super(CryptoTradingEnvironment, self).__init__(observation_spec, action_spec)

    def _observe(self):
        """Generate the context vector."""
        start = self._current_step - self._lookback
        end = self._current_step
        
        # Calculate pct returns for the lookback window
        returns = self._data.iloc[start:end].pct_change().dropna()
        
        # Flatten the returns into a single context vector
        # Pad with zeros if there are not enough returns (at the beginning)
        flat_returns = returns[[f'{name}_close' for name in CRYPTO_NAMES]].values.flatten()
        
        # Ensure consistent size
        required_size = self._num_cryptos * (self._lookback - 1)
        padding_size = required_size - len(flat_returns)
        if padding_size > 0:
            flat_returns = np.pad(flat_returns, (0, padding_size), 'constant')

        return flat_returns.astype(np.float32)

    def _apply_action(self, action):
        """Calculate the reward for the chosen action."""
        crypto_index = action // 2
        is_buy_action = action % 2 == 0

        current_price = self._data.iloc[self._current_step][f'{CRYPTO_NAMES[crypto_index]}_close']
        next_price = self._data.iloc[self._current_step + 1][f'{CRYPTO_NAMES[crypto_index]}_close']
        
        # Simple reward: percent change
        if is_buy_action:
            # Reward for buying is positive if price goes up
            reward = (next_price - current_price) / current_price
        else: # Sell action
            # Reward for selling is positive if price goes down
            reward = (current_price - next_price) / current_price

        self._current_step += 1
        return reward

# --- 3. Instantiate Environment and Agent ---
LOOKBACK_WINDOW = 10
NUM_ACTIONS = len(CRYPTO_NAMES) * 2

# Wrap the Python environment in a TF environment
tf_env = tf_py_environment.TFPyEnvironment(
    CryptoTradingEnvironment(data, lookback_window=LOOKBACK_WINDOW)
)

# Create the LinUCB Agent
observation_spec = tf_env.observation_spec()
time_step_spec = ts.time_step_spec(observation_spec)
action_spec = tf_env.action_spec()

agent = lin_ucb_agent.LinearUCBAgent(
    time_step_spec=time_step_spec,
    action_spec=action_spec,
    alpha=1.0,  # Alpha controls exploration. Higher alpha = more exploration.
    dtype=tf.float32
)

# --- 4. Define the Training Loop ---
NUM_TRAINING_STEPS = 1000 # Must be less than NUM_STEPS - LOOKBACK_WINDOW
regret_metric = tf_bandit_metrics.RegretMetric(lambda: 0) # Simple optimal reward is 0

def get_action_name(action):
    crypto_index = action // 2
    action_type = "BUY" if action % 2 == 0 else "SELL"
    return f"{action_type} {CRYPTO_NAMES[crypto_index]}"
    
# Use a driver to run the loop
driver = dynamic_step_driver.DynamicStepDriver(
    env=tf_env,
    policy=agent.policy,
    num_steps=NUM_TRAINING_STEPS,
    observers=[regret_metric] # You can add more observers, like saving trajectories
)

print("Starting training...")
driver.run()
print("Training finished.")

# --- 5. Evaluate the Results ---
cumulative_reward = regret_metric.result()
print(f"Total reward earned over {NUM_TRAINING_STEPS} steps: {cumulative_reward.numpy()}")

# Let's check the last few decisions
time_step = tf_env.reset()
cumulative_reward = 0
for i in range(50):
    action_step = agent.policy.action(time_step)
    action = action_step.action.numpy()[0]
    time_step = tf_env.step(action)
    reward = time_step.reward.numpy()[0]
    cumulative_reward += reward
    print(f"Step {i}: Chose action '{get_action_name(action)}', received reward {reward:.4f}")

print(f"\nFinal cumulative reward over last 50 steps: {cumulative_reward:.4f}")