# Chronos-T5 Testing on BVMT Data

Testing Amazon Chronos-T5 on Tunisian Stock Market Data

In [None]:
# Install dependencies
!pip install git+https://github.com/amazon-science/chronos-forecasting.git
!pip install torch transformers matplotlib plotly pandas numpy requests

Collecting git+https://github.com/amazon-science/chronos-forecasting.git
  Cloning https://github.com/amazon-science/chronos-forecasting.git to /tmp/pip-req-build-khd_x18f
  Running command git clone --filter=blob:none --quiet https://github.com/amazon-science/chronos-forecasting.git /tmp/pip-req-build-khd_x18f
  Resolved https://github.com/amazon-science/chronos-forecasting.git to commit 1f099eb265a4b423529929321929d4258dc031d8
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting transformers<5,>=4.41 (from chronos-forecasting==2.2.2)
  Downloading transformers-4.57.6-py3-none-any.whl.metadata (43 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.0/44.0 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface_hub>=0.21.0 (from accelerate<2,>=0.34->chronos-forecasting==2.2.2)
  Downloading huggingface_hub-0.36.2-py3-none-any.w

In [None]:
import warnings
warnings.filterwarnings('ignore')

import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from chronos import ChronosPipeline
import requests
from io import StringIO

print(f'PyTorch: {torch.__version__}')
print(f'CUDA: {torch.cuda.is_available()}')



PyTorch: 2.9.0+cpu
CUDA: False


## Load Chronos Model

In [None]:
MODEL_NAME = 'amazon/chronos-t5-large'
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

print(f'Loading {MODEL_NAME}...')
pipeline = ChronosPipeline.from_pretrained(
    MODEL_NAME,
    device_map=DEVICE,
    torch_dtype=torch.bfloat16 if DEVICE == 'cuda' else torch.float32
)
print('Model loaded successfully!')

Loading amazon/chronos-t5-large...


config.json: 0.00B [00:00, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors:   0%|          | 0.00/2.84G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/142 [00:00<?, ?B/s]

Model loaded successfully!


## Load BVMT Data from GitHub

In [None]:
BASE_URL = 'https://raw.githubusercontent.com/hecfaitdepartment/cahier-de-charges-code_lab2.0/main/'

def load_bvmt_data(year):
    filename = f'histo_cotation_{year}.csv'
    url = BASE_URL + filename
    try:
        print(f'Loading {filename}...')
        response = requests.get(url)
        response.raise_for_status()
        df = pd.read_csv(StringIO(response.text))
        print(f'  Loaded {len(df):,} records')
        return df
    except Exception as e:
        print(f'  Error: {e}')
        return None

all_data = {}
for year in range(2022, 2026):
    df = load_bvmt_data(year)
    if df is not None:
        all_data[year] = df

print(f'Loaded {len(all_data)} years of data')

Loading histo_cotation_2022.csv...
  Loaded 83,939 records
Loading histo_cotation_2023.csv...
  Error: Error tokenizing data. C error: Expected 1 fields in line 78, saw 2

Loading histo_cotation_2024.csv...
  Error: Error tokenizing data. C error: Expected 1 fields in line 73, saw 2

Loading histo_cotation_2025.csv...
  Error: Error tokenizing data. C error: Expected 1 fields in line 71, saw 2

Loaded 1 years of data


In [None]:
# Display sample data
if all_data:
    sample_year = max(all_data.keys())
    sample_df = all_data[sample_year]
    print(f'Sample from {sample_year}:')
    print(f'Columns: {list(sample_df.columns)}')
    display(sample_df.head())

Sample from 2022:
Columns: ['SEANCE    ;GROUPE    ;CODE        ;VALEUR          ;OUVERTURE ;CLOTURE    ;PLUS_BAS   ;PLUS_HAUT ;QUANTITE_NEGOCIEE;NB_TRANSACTION ;CAPITAUX ']


Unnamed: 0,SEANCE ;GROUPE ;CODE ;VALEUR ;OUVERTURE ;CLOTURE ;PLUS_BAS ;PLUS_HAUT ;QUANTITE_NEGOCIEE;NB_TRANSACTION ;CAPITAUX
0,03/01/2022 ;11 ;TN0001000108;MONOPRIX...
1,03/01/2022 ;11 ;TN0001100254;SFBT ...
2,03/01/2022 ;11 ;TN0001600154;ATTIJARI...
3,03/01/2022 ;11 ;TN0001800457;BIAT ...
4,03/01/2022 ;11 ;TN0001900604;BH BANK ...


## Process and Combine Data

In [None]:
all_frames = []
for year, df in all_data.items():
    df_copy = df.copy()
    df_copy.columns = df_copy.columns.str.strip().str.lower()
    if 'year' not in df_copy.columns:
        df_copy['year'] = year
    all_frames.append(df_copy)

full_data = pd.concat(all_frames, ignore_index=True)
print(f'Combined: {len(full_data):,} records')
print(f'Columns: {list(full_data.columns)}')

Combined: 83,939 records
Columns: ['seance    ;groupe    ;code        ;valeur          ;ouverture ;cloture    ;plus_bas   ;plus_haut ;quantite_negociee;nb_transaction ;capitaux', 'year']


## Extract Stock Time Series

In [None]:
# Find column names
cols_lower = [c.lower() for c in full_data.columns]
print(f'Available columns: {cols_lower}')

# Try to find stock, date, and price columns
stock_col = next((c for c in ['valeur', 'ticker', 'symbole'] if c in cols_lower), cols_lower[0])
date_col = next((c for c in ['date', 'jour', 'seance'] if c in cols_lower), None)
price_col = next((c for c in ['dernier', 'close', 'cloture', 'last'] if c in cols_lower), None)

print(f'Stock column: {stock_col}')
print(f'Date column: {date_col}')
print(f'Price column: {price_col}')

Available columns: ['seance    ;groupe    ;code        ;valeur          ;ouverture ;cloture    ;plus_bas   ;plus_haut ;quantite_negociee;nb_transaction ;capitaux', 'year']
Stock column: seance    ;groupe    ;code        ;valeur          ;ouverture ;cloture    ;plus_bas   ;plus_haut ;quantite_negociee;nb_transaction ;capitaux
Date column: None
Price column: None


In [None]:
# Extract individual stocks
stocks_data = {}
unique_stocks = full_data[stock_col].unique()
print(f'Found {len(unique_stocks)} unique stocks')

for stock in unique_stocks[:10]:
    stock_df = full_data[full_data[stock_col] == stock].copy()

    if date_col and date_col in stock_df.columns:
        try:
            stock_df[date_col] = pd.to_datetime(stock_df[date_col])
            stock_df = stock_df.sort_values(date_col)
        except:
            pass

    if price_col and price_col in stock_df.columns:
        try:
            stock_df[price_col] = pd.to_numeric(stock_df[price_col], errors='coerce')
            stock_df = stock_df.dropna(subset=[price_col])
        except:
            pass

    if len(stock_df) > 50:
        stocks_data[stock] = stock_df

print(f'Extracted {len(stocks_data)} stocks with sufficient data')
for i, (stock, df) in enumerate(list(stocks_data.items())[:5], 1):
    print(f'{i}. {stock}: {len(df)} observations')

Found 83939 unique stocks
Extracted 0 stocks with sufficient data


## Visualize Historical Prices

In [None]:
if stocks_data and price_col:
    selected_stocks = list(stocks_data.keys())[:5]

    fig = go.Figure()
    colors = ['blue', 'orange', 'green', 'red', 'purple']

    for idx, stock in enumerate(selected_stocks):
        df = stocks_data[stock]
        x_data = df[date_col] if date_col in df.columns else df.index

        fig.add_trace(go.Scatter(
            x=x_data,
            y=df[price_col],
            mode='lines',
            name=stock,
            line=dict(width=2, color=colors[idx])
        ))

    fig.update_layout(
        title='BVMT Stock Prices',
        xaxis_title='Date',
        yaxis_title='Price (TND)',
        height=500
    )
    fig.show()

## Run Chronos Forecast

In [None]:
if stocks_data and price_col:
    FORECAST_STOCK = list(stocks_data.keys())[0]
    PREDICTION_LENGTH = 30

    print(f'Forecasting for: {FORECAST_STOCK}')
    print(f'Prediction horizon: {PREDICTION_LENGTH} days')

    df = stocks_data[FORECAST_STOCK]
    prices = df[price_col].values
    prices = prices[~np.isnan(prices)]

    context = torch.tensor(prices, dtype=torch.float32)

    print(f'Historical data points: {len(prices)}')
    print(f'Running inference...')

    with torch.no_grad():
        forecast = pipeline.predict(
            context,
            prediction_length=PREDICTION_LENGTH,
            num_samples=100
        )

    print(f'Forecast shape: {forecast.shape}')

    forecast_numpy = forecast[0].numpy()
    q10, median, q90 = np.quantile(forecast_numpy, [0.1, 0.5, 0.9], axis=0)

    print(f'Median forecast: {median.mean():.2f} TND')

## Visualize Forecast

In [None]:
if 'forecast' in locals() and price_col:
    df = stocks_data[FORECAST_STOCK]

    if date_col in df.columns:
        dates = df[date_col].values
        last_date = pd.to_datetime(dates[-1])
        forecast_dates = pd.date_range(
            start=last_date + pd.Timedelta(days=1),
            periods=PREDICTION_LENGTH,
            freq='D'
        )
    else:
        dates = np.arange(len(prices))
        forecast_dates = np.arange(len(prices), len(prices) + PREDICTION_LENGTH)

    fig = go.Figure()

    fig.add_trace(go.Scatter(
        x=dates,
        y=prices,
        mode='lines',
        name='Historical',
        line=dict(color='blue', width=2)
    ))

    fig.add_trace(go.Scatter(
        x=forecast_dates,
        y=median,
        mode='lines',
        name='Forecast',
        line=dict(color='red', width=3)
    ))

    fig.add_trace(go.Scatter(
        x=np.concatenate([forecast_dates, forecast_dates[::-1]]),
        y=np.concatenate([q90, q10[::-1]]),
        fill='toself',
        fillcolor='rgba(255, 0, 0, 0.2)',
        line=dict(color='rgba(255,255,255,0)'),
        name='80% Interval'
    ))

    fig.update_layout(
        title=f'Chronos Forecast: {FORECAST_STOCK}',
        xaxis_title='Date',
        yaxis_title='Price (TND)',
        height=600
    )
    fig.show()

## Summary

This notebook demonstrated:
- Loading BVMT stock data from GitHub
- Using Chronos-T5 for forecasting
- Generating probabilistic predictions

Model: amazon/chronos-t5-large (710M parameters)