In [3]:
import asyncio
import logging
from ratelimit import limits, sleep_and_retry
import ccxt.async_support as ccxt_async
from datetime import datetime

logger = logging.getLogger(__name__)

class DataFetcher:
    def __init__(self, exchanges):
        self.exchanges = exchanges
        self.trading_pairs = ['BTC/USD', 'ETH/USD', 'ETH/BTC', 'LTC/USD', 'XRP/USD']
        self.semaphores = {}
        self.rate_limits = {
            'coinbase': 3,
            'bitflyer': 1,
            'gemini': 1,
            'kraken': 1,
            'bitstamp': 2,
        }

    def normalize_ticker_data(self, exchange_name, ticker_data):
        # Normalize timestamp and datetime
        timestamp = ticker_data.get('timestamp')
        datetime_str = ticker_data.get('datetime')

        # Convert timestamp to ISO 8601 datetime if available
        if timestamp:
            if len(str(timestamp)) > 10:  # Timestamp is in milliseconds
                timestamp = timestamp / 1000
            normalized_datetime = datetime.utcfromtimestamp(timestamp).isoformat() + 'Z'
        elif datetime_str:
            normalized_datetime = datetime_str
            # Convert datetime to Unix timestamp
            timestamp = int(datetime.strptime(datetime_str, "%Y-%m-%dT%H:%M:%S.%fZ").timestamp())
        else:
            normalized_datetime = None
            timestamp = None

        # Standardize fields for bid, ask, and volume across exchanges
        bid = ticker_data.get('bid') or ticker_data.get('best_bid') or ticker_data.get('a', [None])[0]
        ask = ticker_data.get('ask') or ticker_data.get('best_ask') or ticker_data.get('a', [None])[1]
        last = ticker_data.get('last') or ticker_data.get('ltp') or ticker_data.get('c', [None])[0]
        volume = ticker_data.get('baseVolume') or ticker_data.get('volume') or ticker_data.get('v', [None])[1]

        # Normalize data for all exchanges
        normalized_data = {
            'exchange': exchange_name,
            'symbol': ticker_data.get('symbol'),
            'timestamp': timestamp,  # Unix timestamp
            'datetime': normalized_datetime,  # ISO 8601 datetime
            'bid': bid,
            'ask': ask,
            'last': last,
            'high': ticker_data.get('high'),
            'low': ticker_data.get('low'),
            'volume': volume,
        }

        return normalized_data

    async def fetch_ticker_async(self, exchange_name, exchange, symbol):
        async with self.semaphores[exchange_name]:
            try:
                logger.debug(f"Attempting to fetch {symbol} from {exchange_name}")
                async with exchange() as exchange_instance:
                    await exchange_instance.load_markets()
                    if symbol not in exchange_instance.markets:
                        logger.warning(f"{exchange_name} does not support {symbol}")
                        return None
                    ticker = await exchange_instance.fetch_ticker(symbol)
                    logger.debug(f"Successfully fetched {symbol} from {exchange_name}")
                    logger.debug(f"Raw ticker data for {symbol} from {exchange_name}: {ticker}")
                    
                    # Normalize the ticker data
                    processed_ticker = self.normalize_ticker_data(exchange_name, ticker)
                    
                    logger.debug(f"Processed ticker data for {symbol} from {exchange_name}: {processed_ticker}")
                    return processed_ticker
            except ccxt_async.NetworkError as e:
                logger.error(f"Network error fetching {symbol} from {exchange_name}: {str(e)}")
            except ccxt_async.ExchangeError as e:
                logger.error(f"Exchange error fetching {symbol} from {exchange_name}: {str(e)}")
            except Exception as e:
                logger.error(f"Unexpected error fetching {symbol} from {exchange_name}: {str(e)}")
            return None

    async def fetch_all_tickers_async(self):
        tasks = []
        for exchange_name, exchange_class in self.exchanges.items():
            exchange = getattr(ccxt_async, exchange_class.__name__)
            rate_limit = self.rate_limits.get(exchange_name, 1)
            self.semaphores[exchange_name] = asyncio.Semaphore(rate_limit)
            for symbol in self.trading_pairs:
                tasks.append(self.fetch_ticker_async(exchange_name, exchange, symbol))
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        valid_results = [r for r in results if r is not None and not isinstance(r, Exception)]
        logger.info(f"Fetched {len(valid_results)} valid tickers out of {len(results)} attempts")
        return valid_results

    def get_min_rate_limit(self):
        return min(self.rate_limits.values())


In [None]:
import asyncio
from datetime import datetime
import ccxt.async_support as ccxt_async

class DataFetcher:
    def __init__(self, exchanges):
        self.exchanges = exchanges
        self.trading_pairs = ['BTC/USD', 'ETH/USD', 'ETH/BTC', 'LTC/USD', 'XRP/USD']
        self.semaphores = {}
        self.rate_limits = {
            'coinbase': 3,
            'bitflyer': 1,
            'gemini': 1,
            'kraken': 1,
            'bitstamp': 2,
        }

    async def fetch_ticker_async(self, exchange_name, exchange, symbol):
        async with self.semaphores[exchange_name]:
            try:
                print(f"Attempting to fetch {symbol} from {exchange_name}")
                async with exchange() as exchange_instance:
                    await exchange_instance.load_markets()
                    if symbol not in exchange_instance.markets:
                        print(f"{exchange_name} does not support {symbol}")
                        return None

                    ticker = await exchange_instance.fetch_ticker(symbol)

                    # Fallback to server time for Kraken if timestamp is missing
                    if exchange_name == 'kraken':
                        if ticker['timestamp'] is None and ticker['datetime'] is None:
                            server_time = await exchange_instance.fetch_time()
                            print(f"Using Kraken server time as fallback: {server_time}")
                            formatted_time = self.format_timestamp(server_time)
                        else:
                            timestamp = ticker['timestamp'] if ticker['timestamp'] else exchange_instance.parse8601(ticker['datetime'])
                            formatted_time = self.format_timestamp(timestamp)
                    else:
                        timestamp = ticker['timestamp'] if ticker['timestamp'] else exchange_instance.parse8601(ticker['datetime'])
                        formatted_time = self.format_timestamp(timestamp)

                    print(f"Successfully fetched {symbol} from {exchange_name}")
                    
                    processed_ticker = {
                        'exchange': exchange_name,
                        'symbol': ticker['symbol'],
                        'timestamp': formatted_time,
                        'bid': ticker['bid'],
                        'ask': ticker['ask'],
                        'last': ticker['last'],
                        'high': ticker.get('high'),
                        'low': ticker.get('low'),
                        'volume': ticker.get('baseVolume'),
                    }

                    print(f"Processed ticker data for {symbol} from {exchange_name}: {processed_ticker}")
                    return processed_ticker
            except ccxt_async.NetworkError as e:
                print(f"Network error fetching {symbol} from {exchange_name}: {str(e)}")
            except ccxt_async.ExchangeError as e:
                print(f"Exchange error fetching {symbol} from {exchange_name}: {str(e)}")
            except Exception as e:
                print(f"Unexpected error fetching {symbol} from {exchange_name}: {str(e)}")
            return None

    async def fetch_all_tickers_async(self):
        tasks = []
        for exchange_name, exchange_class in self.exchanges.items():
            exchange = getattr(ccxt_async, exchange_class.__name__)
            rate_limit = self.rate_limits.get(exchange_name, 1)
            self.semaphores[exchange_name] = asyncio.Semaphore(rate_limit)
            for symbol in self.trading_pairs:
                tasks.append(self.fetch_ticker_async(exchange_name, exchange, symbol))
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        valid_results = [r for r in results if r is not None and not isinstance(r, Exception)]
        print(f"Fetched {len(valid_results)} valid tickers out of {len(results)} attempts")
        return valid_results

    def format_timestamp(self, timestamp):
        """Convert the timestamp to the format: YYYY-MM-DD HH:MM:SS.SSS"""
        dt = datetime.utcfromtimestamp(timestamp / 1000)  # convert from milliseconds to seconds
        return dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]  # format and trim to milliseconds


# Define the exchanges and pass them to the DataFetcher
exchanges = {
    'kraken': ccxt_async.kraken,
    'coinbase': ccxt_async.coinbase,
    'bitflyer': ccxt_async.bitflyer,
    'gemini': ccxt_async.gemini,
    'bitstamp': ccxt_async.bitstamp
}

# Initialize the DataFetcher
fetcher = DataFetcher(exchanges)

# Run the fetch_all_tickers_async function and display the output in the notebook
async def run_fetcher():
    result = await fetcher.fetch_all_tickers_async()
    print(result)

# Run the fetcher asynchronously in a Jupyter notebook
await run_fetcher()


CCXT guidelines when it comes to timestamp etc.

In [9]:
import asyncio
from datetime import datetime
import ccxt.async_support as ccxt_async

class DataFetcher:
    def __init__(self, exchanges):
        self.exchanges = exchanges
        self.trading_pairs = ['BTC/USD', 'ETH/USD', 'ETH/BTC', 'LTC/USD', 'XRP/USD']
        self.semaphores = {}
        self.rate_limits = {
            'coinbase': 3,
            'bitflyer': 1,
            'gemini': 1,
            'kraken': 1,
            'bitstamp': 2,
        }

    async def fetch_ticker_async(self, exchange_name, exchange, symbol):
        # Ensure semaphore exists for the exchange (rate-limiting)
        async with self.semaphores[exchange_name]:
            try:
                print(f"Attempting to fetch {symbol} from {exchange_name}")
                async with exchange() as exchange_instance:
                    await exchange_instance.load_markets()
                    if symbol not in exchange_instance.markets:
                        print(f"{exchange_name} does not support {symbol}")
                        return None

                    # Fetch ticker data
                    ticker = await exchange_instance.fetch_ticker(symbol)

                    # Handle Kraken-specific timestamp fallback
                    if exchange_name == 'kraken':
                        if ticker['timestamp'] is None and ticker['datetime'] is None:
                            server_time = await exchange_instance.fetch_time()
                            print(f"Using Kraken server time as fallback: {server_time}")
                            formatted_time = exchange_instance.iso8601(server_time)
                        else:
                            timestamp = ticker['timestamp'] if ticker['timestamp'] else exchange_instance.parse8601(ticker['datetime'])
                            formatted_time = exchange_instance.iso8601(timestamp)
                    else:
                        timestamp = ticker['timestamp'] if ticker['timestamp'] else exchange_instance.parse8601(ticker['datetime'])
                        formatted_time = exchange_instance.iso8601(timestamp)

                    print(f"Successfully fetched {symbol} from {exchange_name}")

                    # Process ticker data
                    processed_ticker = {
                        'exchange': exchange_name,
                        'symbol': ticker['symbol'],
                        'timestamp': formatted_time,
                        'bid': ticker['bid'],
                        'ask': ticker['ask'],
                        'last': ticker['last'],
                        'high': ticker.get('high'),
                        'low': ticker.get('low'),
                        'volume': ticker.get('baseVolume'),
                    }

                    print(f"Processed ticker data for {symbol} from {exchange_name}: {processed_ticker}")
                    return processed_ticker
            except ccxt_async.NetworkError as e:
                print(f"Network error fetching {symbol} from {exchange_name}: {str(e)}")
            except ccxt_async.ExchangeError as e:
                print(f"Exchange error fetching {symbol} from {exchange_name}: {str(e)}")
            except Exception as e:
                print(f"Unexpected error fetching {symbol} from {exchange_name}: {str(e)}")
            return None

    async def fetch_all_tickers_async(self):
        tasks = []
        for exchange_name, exchange_class in self.exchanges.items():
            exchange = getattr(ccxt_async, exchange_class.__name__)
            rate_limit = self.rate_limits.get(exchange_name, 1)
            self.semaphores[exchange_name] = asyncio.Semaphore(rate_limit)
            for symbol in self.trading_pairs:
                tasks.append(self.fetch_ticker_async(exchange_name, exchange, symbol))
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        valid_results = [r for r in results if r is not None and not isinstance(r, Exception)]
        print(f"Fetched {len(valid_results)} valid tickers out of {len(results)} attempts")
        return valid_results

    def format_timestamp(self, timestamp):
        """Convert the timestamp to the format: YYYY-MM-DD HH:MM:SS.SSS"""
        dt = datetime.utcfromtimestamp(timestamp / 1000)  # convert from milliseconds to seconds
        return dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]  # format and trim to milliseconds

# Define the exchanges and pass them to the DataFetcher
exchanges = {
    'kraken': ccxt_async.kraken,
    'coinbase': ccxt_async.coinbase,
    'bitflyer': ccxt_async.bitflyer,
    'gemini': ccxt_async.gemini,
    'bitstamp': ccxt_async.bitstamp
}

# Initialize the DataFetcher
fetcher = DataFetcher(exchanges)

# Async function to run the fetcher and print results
async def run_fetcher():
    print("Starting ticker fetch...")
    result = await fetcher.fetch_all_tickers_async()
    print("All tickers fetched:")
    print(result)

# Running the function in Jupyter notebook or as a Python script
if __name__ == "__main__":
    # For Python script: Uncomment this line
    # asyncio.run(run_fetcher())

    # For Jupyter notebook, use this to run the async function:
    await run_fetcher()


Starting ticker fetch...
Attempting to fetch BTC/USD from kraken
Attempting to fetch BTC/USD from coinbase
Attempting to fetch ETH/USD from coinbase
Attempting to fetch ETH/BTC from coinbase
Attempting to fetch BTC/USD from bitflyer
Attempting to fetch BTC/USD from gemini
Attempting to fetch BTC/USD from bitstamp
Attempting to fetch ETH/USD from bitstamp
Successfully fetched BTC/USD from bitstamp
Processed ticker data for BTC/USD from bitstamp: {'exchange': 'bitstamp', 'symbol': 'BTC/USD', 'timestamp': '2024-09-24T04:21:04.000Z', 'bid': 62973.0, 'ask': 62974.0, 'last': 63018.0, 'high': 64204.0, 'low': 62720.0, 'volume': 1097.68149457}
Successfully fetched ETH/USD from bitstamp
Processed ticker data for ETH/USD from bitstamp: {'exchange': 'bitstamp', 'symbol': 'ETH/USD', 'timestamp': '2024-09-24T04:21:03.000Z', 'bid': 2618.8, 'ask': 2619.2, 'last': 2619.4, 'high': 2703.8, 'low': 2609.8, 'volume': 4077.31421682}
Attempting to fetch ETH/BTC from bitstamp
Attempting to fetch LTC/USD from b