# Data acquisition and processing demo

This notebook demonstrates the current capabilities of the **Funding Rates Arbitrage** project, a system for real-time cryptocurrency data acquisition, processing, analysis and trading.

Key Features Demonstrated:
- **Multi-exchange data collection with CCXT** (Binance, Hyperliquid)
- **Real-time time series management** with shared memory
- **Built series with dependencies** (technical indicators or strategies)
- **Database persistence** with DuckDB
- **MPI-based** shared memory

The project aims to enable sophisticated trading strategies by providing efficient data structures, real-time processing, and multi-process coordination.

## Imports and Initialization

First, we import the core components of the package:

In [1]:
from core.data.models.time_series import *
from core.data.streaming.DataRegistry import *

[MacBook-Air-4.local:65583] shmem: mmap: an error occurred while determining whether or not /var/folders/4b/k1tn9tg90lq4g9v84ws8c_xw0000gn/T//ompi.MacBook-Air-4.501/jf.0/2042822656/sm_segment.MacBook-Air-4.501.79c30000.0 could be created.


(1) MPI Setup for Multi-Process Communication

The package uses **MPI4Py** for efficient multi-process communication and shared memory management:

In [2]:
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()

(2) DataRegistry - Central Time Series Management

The **DataRegistry** is the core component that manages all time series data with:
- UUID-based indexing
- Metadata-based lookup
- Dependency management
- Database integration

In [3]:
registry = DataRegistry(comm, rank)

## Creating Time Series acquired from crypto exchanges

We create multiple time series from different exchanges with various data types and timeframes:

- **Binance BTC/USDT OHLCV** (1-minute intervals)
- **Binance BTC/USDT:USDT Funding Rate** (8-hour intervals)
- **Binance BTC/USDT:USDT OHLCV** (1-minute intervals)
- **Binance BTC/USDT:USDT current Funding Rate** (10-second intervals)
- **Hyperliquid BTC/USDC:USDC Funding Rate** (1-hour intervals)

Each series uses a cache size of 4 for efficient memory management.

In [4]:
registry.create_third_party_series("binance","BTC/USDT","OHLCV",60000, cache=4)
registry.create_third_party_series("binance","BTC/USDT:USDT","fundingRate",60000 * 60 * 8, cache=4)
registry.create_third_party_series("binance","BTC/USDT:USDT","OHLCV",60000, cache=4)
registry.create_third_party_series("binance","BTC/USDT:USDT","tmpfundingRate",10000, cache=4)
registry.create_third_party_series("hyperliquid","BTC/USDC:USDC","fundingRate",60000 * 60, cache=4)

'81025db5-48a2-46f8-a5e3-5aba2296a6e0'

# Subscription Management

The project includes a functionality to save and load CCXT subscriptions for persistence:

In [5]:
registry.save_ccxt_subscriptions( "sub.json")

We can recreate the registry and load previously saved subscriptions. This demonstrates:
- **Configuration persistence**
- **Registry reconstruction**
- **Automatic series creation** from metadata

In [6]:
registry = DataRegistry(comm, rank)
registry.load_series_metadata("sub.json")
registry.create_third_party_from_file("sub.json")

['face875f-94c5-4e05-a9e1-bd9b96b1cd4f',
 'd921b275-3cd3-48a9-adbf-fd6eee835928',
 '733af7e7-4be4-4ab2-b28e-404908e6db43',
 '9f0ba70f-5225-4dbd-aaa2-824b20c75c0c',
 '26eb92b1-9729-4e05-8b40-2670deb3341f']

## Retrieving Time Series Objects

Once created in the DataRegistry, time series can be retrieved using their metadata tuple (exchange, symbol, data_type, timeframe):

In [7]:
ohlcv = registry.get_series( ("binance","BTC/USDT","OHLCV",60000) )
fundingRate = registry.get_series( ("hyperliquid","BTC/USDC:USDC","fundingRate",60000 * 60) )
fundingRatebis = registry.get_series( ("binance","BTC/USDT:USDT","fundingRate",60000 * 60 * 8) )
ohlcv_bis = registry.get_series( ("binance","BTC/USDT:USDT","OHLCV",60000) )
tmpfundingRatebis = registry.get_series( ("binance","BTC/USDT:USDT","tmpfundingRate",10000) )

## Built Series with Dependencies

The project includes code to create **built series** that depend on other time series. This enables:

- **Real-time technical analysis**
- **Multi-series calculations**
- **Dependency-based updates**
- **Integration with TA-Lib**

Here we create some built series for the purpose of illustration.

In [8]:
def ma_transition(deps, prev_data):
    ohlcv = deps[0]
    return np.mean(ohlcv.data[-2:], axis=0)

ma_id = registry.create_built_series(
    duration = 60000,
    transition_function=ma_transition,
    dependencies=[("binance", "BTC/USDT", "OHLCV",60000)],  # Reference by metadata
    data_shape=5,  # Output shape
    cache = 2,
    symbol = "MA2BTC/USDT",
    serie_type = "OHLCV"
)
ma_serie = registry.get_series(ma_id)

In [9]:
from core.data.processing.transforms import ta_lib

ma_transition_talib = lambda x,y : ta_lib(x,y,"sma",timeperiod=2)

ma_talib_id = registry.create_built_series(
    duration = 60000,
    transition_function = ma_transition_talib,
    dependencies= [("binance", "BTC/USDT", "OHLCV",60000)],  # Reference by metadata
    data_shape = 5,  # Output shape
    cache = 4,
    symbol = "smothed_BTC/USDT",
    serie_type = "OHLCV"
)
ma_talib_serie = registry.get_series(ma_talib_id)

In [10]:
def max_transition(deps, prev_data):
    tmp = np.array( [ serie_data[-1] for serie_data in deps ] )
    return np.argmax(tmp, axis=0)

max_id = registry.create_built_series(
    duration = 60000,
    transition_function = max_transition,
    dependencies=[  ("binance", "BTC/USDT", "OHLCV",60000),
                    ('local', 'MA2BTC/USDT', 'OHLCV', 60000)],  # Reference by metadata
    data_shape=5,  # Output shape
    cache = 2,
    symbol = "MAX2BTC/USDT",
    serie_type = "max"
)
max_serie = registry.get_series(max_id)

In [11]:
from core.data.processing.transforms import ohlcv_from_ohlcv

one_min_to_three_min = lambda x,y : ohlcv_from_ohlcv(x, y, 60000, 3*60000)

three_min_btc_id = registry.create_built_series(
    duration = 3 * 60000,
    transition_function = one_min_to_three_min,
    dependencies=[  ("binance", "BTC/USDT", "OHLCV",60000)],  # Reference by metadata
    data_shape=5,  # Output shape
    cache = 2,
    symbol = "BTC/USDT",
    serie_type = "OHLCV"
)
three_min_btc = registry.get_series(three_min_btc_id)

The registry maintains a metadata index that maps series metadata to their UUIDs.

Below we check that the the metadata index contains every serie we created:

In [12]:
registry._metadata_index

{('binance',
  'BTC/USDT',
  'OHLCV',
  60000): 'face875f-94c5-4e05-a9e1-bd9b96b1cd4f',
 ('binance',
  'BTC/USDT:USDT',
  'fundingRate',
  28800000): 'd921b275-3cd3-48a9-adbf-fd6eee835928',
 ('binance',
  'BTC/USDT:USDT',
  'OHLCV',
  60000): '733af7e7-4be4-4ab2-b28e-404908e6db43',
 ('binance',
  'BTC/USDT:USDT',
  'tmpfundingRate',
  10000): '9f0ba70f-5225-4dbd-aaa2-824b20c75c0c',
 ('hyperliquid',
  'BTC/USDC:USDC',
  'fundingRate',
  3600000): '26eb92b1-9729-4e05-8b40-2670deb3341f',
 ('local',
  'MA2BTC/USDT',
  'OHLCV',
  60000): '068f560b-c3c2-4e7d-a593-a951876726c2',
 ('local',
  'smothed_BTC/USDT',
  'OHLCV',
  60000): '244c5485-9488-4aac-9698-308bf9c9044e',
 ('local',
  'MAX2BTC/USDT',
  'max',
  60000): '36fcb84d-a2fa-473b-9d90-c4bc8acdbe8b',
 ('local',
  'BTC/USDT',
  'OHLCV',
  180000): 'c0029a8b-2f61-453e-8d8a-8d69f1ecf36d'}

## Database Integration with DuckDB

The package provides seamless **DuckDB integration** for data persistence and retrieval:

- **Automatic data loading** from existing database
- **Historical data persistence**
- **Efficient columnar storage**
- **SQL-compatible queries**

We load the data from the database if it exists.

In [13]:
import duckdb
conn = duckdb.connect('test.db')
loaded, fail_to_load = registry.load_all_from_db(conn)


Series not found in database:
- binance BTC/USDT OHLCV 60000
- binance BTC/USDT:USDT fundingRate 28800000
- binance BTC/USDT:USDT OHLCV 60000
- binance BTC/USDT:USDT tmpfundingRate 10000
- hyperliquid BTC/USDC:USDC fundingRate 3600000
- local MA2BTC/USDT OHLCV 60000
- local smothed_BTC/USDT OHLCV 60000
- local MAX2BTC/USDT max 60000
- local BTC/USDT OHLCV 180000


## Data acquisition from crypto exchanges

We use the CCXT package and its API to acquire the data from the crypto exchanges. It relies on asyncio to use concurrent programming.

In [14]:
ccxt_feeder = registry.subscribe_to_ccxt_adapter(start_time= int( time.time() * 1000 / (8*60000*60) ) * (8*60000*60) - 60 * 60000, conn = conn)

We start the data acquisition.

In [None]:
ccxt_feeder.run()

Start collecting :  Binance BTC/USDT OHLCV 1m
Start collecting :  Binance BTC/USDT:USDT OHLCV 1m
Start collecting :  Binance BTC/USDT:USDT fundingRate 8h
Start collecting :  Binance BTC/USDT:USDT tmpfundingRate 10s
Start collecting :  Hyperliquid BTC/USDC:USDC fundingRate 1h


______________________________________________________________________
2025-09-08 02:00:00 : successful acquisition of duration  28800000
______________________________________________________________________
2025-09-08 12:18:50 : successful acquisition of duration  10000
______________________________________________________________________
2025-09-08 09:00:00 : successful acquisition of duration  60000
______________________________________________________________________
2025-09-08 10:00:00 : successful acquisition of duration  28800000
______________________________________________________________________
2025-09-08 09:01:00 : successful acquisition of duration  60000
______________________________________________________________________
2025-09-08 09:02:00 : successful acquisition of duration  60000
______________________________________________________________________
2025-09-08 09:03:00 : successful acquisition of duration  60000
_________________________________________________

## Built series computation

In below code we use rely on asyncio routines and the threading module from python to concurently compute the built series, taking dependancies into account, when acquired data is available.

In practice it will be done, using MPI4py, in a process distinct from the one in charge of data acquisition from the exchanges.

In [16]:
import asyncio
import threading
import time

async def updata_built_series(registry):
    while True:
        time.sleep(0.5)
        registry.series_manager.update_all()


coroutine = updata_built_series(registry)
#asyncio.run(coroutine)

runner_thread = threading.Thread(target= (lambda : asyncio.run(coroutine)), daemon=False)

runner_thread.start()

______________________________________________________________________
2025-09-08 09:00:00 : successful computation of   local MA2BTC/USDT OHLCV 60000
______________________________________________________________________
2025-09-08 09:00:00 : successful computation of   local BTC/USDT OHLCV 180000
______________________________________________________________________
2025-09-08 09:00:00 : successful computation of   local smothed_BTC/USDT OHLCV 60000
______________________________________________________________________
2025-09-08 09:00:00 : successful computation of   local MAX2BTC/USDT max 60000
______________________________________________________________________
2025-09-08 09:01:00 : successful computation of   local MA2BTC/USDT OHLCV 60000
______________________________________________________________________
2025-09-08 09:01:00 : successful computation of   local smothed_BTC/USDT OHLCV 60000
______________________________________________________________________
2025-09-08 09:01

## Outcome

We check that everything is working by looking at the timestamps (the value $-1$ means no data).

Note that *max_serie* depends on both *ma_serie* and *ohlcv* and *ma_serie* depends on *ohlcv*.

In [17]:
# wait for some data to be acquired
time.sleep(10)

ohlcv.timestamps, ma_serie.timestamps, max_serie.timestamps

(array([[1.75731528e+12],
        [1.75731534e+12],
        [1.75731540e+12],
        [1.75731546e+12]]),
 array([[1.75731534e+12],
        [1.75731540e+12]]),
 array([[1.75731534e+12],
        [1.75731540e+12]]))

We look at the last values of each time series considered above.

In [18]:
ohlcv.data[-2:], ma_serie.data[-2:], max_serie.data[-1]

(array([[1.1118952e+05, 1.1119613e+05, 1.1117614e+05, 1.1119613e+05,
         1.6825700e+00],
        [1.1119612e+05, 1.1121528e+05, 1.1119612e+05, 1.1121527e+05,
         2.0478100e+00]]),
 array([[1.11235995e+05, 1.11236000e+05, 1.11198500e+05, 1.11205765e+05,
         7.47965000e+00],
        [1.11205760e+05, 1.11209065e+05, 1.11182830e+05, 1.11192830e+05,
         3.64426500e+00]]),
 array([1., 1., 1., 0., 1.]))

We also check the timestamps for the 3m OHLCV built from the 1m OHLCV series.

In [19]:
ohlcv.timestamps, three_min_btc.timestamps

(array([[1.75731528e+12],
        [1.75731534e+12],
        [1.75731540e+12],
        [1.75731546e+12]]),
 array([[1.75731516e+12],
        [1.75731534e+12]]))

We load the whole data stored in the database for the 1m OHLCV BTC/USDT from Binance.

In [20]:
table_name = generate_table_name("binance","BTC/USDT","OHLCV",60000)
#table_name = generate_table_name("binance","BTC/USDT:USDT","fundingRate",60000*8*60)
query = f"SELECT * FROM {table_name}"
query += " ORDER BY timestamp"
df = conn.execute(query).fetchdf()
df

Unnamed: 0,timestamp,data_0,data_1,data_2,data_3,data_4
0,-1,,,,,
1,1757314800000,111113.257812,111149.960938,111113.25,111148.90625,1.8278
2,1757314860000,111148.898438,111200.0,111148.898438,111184.742188,5.49058
3,1757314920000,111184.742188,111184.742188,111175.8125,111175.8125,3.71153
4,1757314980000,111175.820312,111184.992188,111157.59375,111184.976562,2.82053
5,1757315040000,111184.992188,111217.3125,111171.992188,111217.3125,6.66103
6,1757315100000,111217.3125,111229.726562,111217.296875,111229.726562,18.00021
7,1757315160000,111229.71875,111231.90625,111216.039062,111231.90625,12.03577
8,1757315220000,111231.921875,111250.0,111231.921875,111250.0,13.12868
9,1757315280000,111249.992188,111250.0,111207.476562,111222.0,9.35334
