# DataBento Downloader

In [None]:
''' DataBento Downloader
    Downloads the tickers within the tickers[] list. Saves them locally to a CSV file.
'''
### Region Imports ###
import databento as db
from dotenv import load_dotenv
import os
import pandas as pd
from datetime import datetime
### End Region Imports ###

# Initialize Data Bento client
client = db.Historical(os.getenv('databento_api_key'))

# Example tickers and date range
tickers = ['SPY']
start_date = '2020-01-01'
end_date = '2024-09-01'

for ticker in tickers:
    dataset = client.timeseries.get_range(
        dataset="XNAS.ITCH",
        symbols=ticker,
        start=start_date,
        end=end_date,
        schema='ohlcv-1d'
    )
    dataset.to_csv(f'databento/downloads/{ticker}_data.csv')


# Converter Method
Working as of 26SEP24

In [47]:
import pandas as pd
from datetime import datetime
import zipfile
import os

ticker = 'SPY'
def convert_to_lean_format(csv_file, ticker, frequency='daily'):
    """
    Converts a CSV file containing stock data into a format compatible with LEAN Local CLI Framework.
    Args:
        csv_file (str): The path to the input CSV file containing stock data.
        ticker (str): The stock ticker symbol.
        frequency (str, optional): The frequency of the data. Can be 'daily', 'hourly', or 'minute'. Defaults to 'daily'.
    Returns:
        None: The function saves the converted data to a file in the appropriate directory based on the frequency. Located within project directory.
    """
    
    df = pd.read_csv(csv_file)
    df['date'] = pd.to_datetime(df.pop('ts_event')).dt.strftime('%Y%m%d %H:%M')
    df = df[['date', 'open', 'high', 'low', 'close', 'volume']]
    
    # Conversion from dollars to deci-cents
    df[['open', 'high', 'low', 'close']] = (df[['open', 'high', 'low', 'close']] * 10000).astype(int)
    
    # Data type sorting for directory saving for csv
    if frequency == 'daily':
        output_dir = 'data/equity/usa/daily/'
        output_file = f"{output_dir}{ticker.lower()}.csv"
    elif frequency == 'hourly':
        output_dir = 'data/equity/usa/hourly/'
        output_file = f"{output_dir}{ticker.lower()}.csv"
    else:
        output_dir = 'data/equity/usa/minute/'
        output_file = f"{output_dir}{(ticker.lower())}.csv"
    
    df.to_csv(output_file, index=False, header=False)
    
    # Zip File
    zip_file = os.path.join(output_dir, f'{ticker.lower()}.zip')
    #zip_file = os.path.join("./", f'{ticker.lower()}.zip')
    #zip_fileblank = f'{ticker.lower()}.zip'
    # Create a zip file and add the csv file to it
    with zipfile.ZipFile(zip_file, 'w') as zf:
        zf.write(output_file, arcname=f'{ticker.lower()}.csv')

    # Optionally, remove the csv file after zipping (if you want the zip to be the only output)
    #os.remove(csv_file)

    print(f"{output_file} has been successfully zipped into {zip_file}.")

In [48]:
csv_file = f'databento/downloads/{ticker}_data.csv'
frequency = 'daily'
convert_to_lean_format(csv_file, ticker, frequency)

data/equity/usa/daily/spy.csv has been successfully zipped into data/equity/usa/daily/spy.zip.


#### Combined file, downloads then converts using databento

In [2]:
import databento as db
import pandas as pd
from datetime import datetime
import os
import zipfile

def convert_to_lean_format(csv_file, ticker, frequency='daily'):
    """
    Converts a CSV file containing stock data into a format compatible with LEAN Local CLI Framework.
    Args:
        csv_file (str): The path to the input CSV file containing stock data.
        ticker (str): The stock ticker symbol.
        frequency (str, optional): The frequency of the data. Can be 'daily', 'hourly', or 'minute'. Defaults to 'daily'.
    Returns:
        None: The function saves the converted data to a file in the appropriate directory based on the frequency. Located within project directory.
    """
    
    df = pd.read_csv(csv_file)
    df['date'] = pd.to_datetime(df.pop('ts_event')).dt.strftime('%Y%m%d %H:%M')
    df = df[['date', 'open', 'high', 'low', 'close', 'volume']]
    
    # Conversion from dollars to deci-cents
    df[['open', 'high', 'low', 'close']] = (df[['open', 'high', 'low', 'close']] * 10000).astype(int)
    
    # Data type sorting for directory saving for csv
    if frequency == 'daily':
        output_dir = 'data/equity/usa/daily/'
        output_file = f"{output_dir}{ticker.lower()}.csv"
    elif frequency == 'hourly':
        output_dir = 'data/equity/usa/hourly/'
        output_file = f"{output_dir}{ticker.lower()}.csv"
    else:
        output_dir = 'data/equity/usa/minute/'
        output_file = f"{output_dir}{(ticker.lower())}.csv"
    
    df.to_csv(output_file, index=False, header=False)
    
    # Zip File
    zip_file = os.path.join(output_dir, f'{ticker.lower()}.zip')
    #zip_file = os.path.join("./", f'{ticker.lower()}.zip')
    #zip_fileblank = f'{ticker.lower()}.zip'
    # Create a zip file and add the csv file to it
    with zipfile.ZipFile(zip_file, 'w') as zf:
        zf.write(output_file, arcname=f'{ticker.lower()}.csv')

    # Optionally, remove the csv file after zipping (if you want the zip to be the only output)
    #os.remove(csv_file)

    print(f"{output_file} has been successfully zipped into {zip_file}.")
    
def download_and_convert(tickers, start_date, end_date, frequency='daily'):
    # Initialize Data Bento client with .env variable api key
    client = db.Historical(os.getenv('databento_api_key'))
    
    # Loop through tickers within ticker list
    for ticker in tickers:
        # CSV File variable
        csv_file = f'databento/downloads/{ticker}_data.csv'
        # Remove the existing file if it exists
        if os.path.exists(csv_file):
            # open the file and get the dates
            os.remove(csv_file)
            
        dataset = client.timeseries.get_range(
            dataset="XNAS.ITCH",
            symbols=ticker,
            start=start_date,
            end=end_date,
            schema='ohlcv-1d'
        )
        
        dataset.to_csv(csv_file)
        # Insert new download and append data logic here
        
        # Convert to QuantConnect format
        convert_to_lean_format(csv_file, ticker, frequency)

# Example ticker list and date range
ticker_list = ['QQQ']
download_and_convert(ticker_list, '2024-01-01', '2024-06-01', 'daily')


data/equity/usa/daily/qqq.csv has been successfully zipped into data/equity/usa/daily/qqq.zip.


The above is working as of 28SEP24

The below contains logic to download and append data if required. The lean converter shouldnt need any amendments, it handles the whole dataset anyway. 

In [5]:
import databento as db
import pandas as pd
from datetime import timedelta
from pathlib import Path

def get_data_from_databento(ticker, start_date, end_date):
    """
    Fetches OHLCV data for a given ticker from Data Bento for the specified date range.
    Args:
        ticker (str): The stock ticker symbol.
        start_date (datetime): The start date of the data to retrieve.
        end_date (datetime): The end date of the data to retrieve.
    
    Returns:
        pd.DataFrame: A DataFrame containing the retrieved data.
    """
    client = db.Historical(os.getenv('databento_api_key'))
    dataset = client.timeseries.get_range(
        dataset="XNAS.ITCH",
        symbols=ticker,
        start=start_date.strftime('%Y-%m-%d'),
        end=end_date.strftime('%Y-%m-%d'),
        schema='ohlcv-1d'
    )
    df = dataset.to_df()
    return df

def download_and_append_data(ticker, start_date, end_date, folder='databento/downloads'):
    """
    Downloads and appends stock data from Data Bento API if necessary.
    Args:
        ticker (str): The stock ticker symbol.
        start_date (str): The start date for data retrieval.
        end_date (str): The end date for data retrieval.
        folder (str): Folder to save the CSV file.
    
    Returns:
        str: The ticker symbol.
    """
    # Define file path for the ticker
    fname = ticker.lower() + '.csv'
    path = Path(folder) / fname

    # Convert start_date and end_date to datetime objects
    start_date = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)

    # Check if the file exists
    if path.exists():
        # Open the file and retrieve existing dates
        df_existing = pd.read_csv(path, index_col=0)
        dates = pd.DatetimeIndex(df_existing.index.sort_values(ascending=True))
                
        # Convert the existing dates to tz-naive (remove timezone). Decide whether to keep this or get tz info
        dates = dates.tz_localize(None)
    else:
        dates = None

    # If no file or date range is not covered
    if dates is None or start_date < dates[0] or end_date > dates[-1]:
        print(f'Fetching data for {ticker} from {start_date} to {end_date}')
        
        # Attempt to fetch new data from Data Bento
        try:
            # Add buffer (delta) to the date range to handle overlaps
            delta = timedelta(days=3)
            df_new = get_data_from_databento(ticker, start_date - delta, end_date + delta)
            
            # If file exists, append new data
            if path.exists():
                df_combined = pd.concat([df_existing, df_new]).drop_duplicates()
                df_combined.sort_index(inplace=True)
                df_combined.to_csv(path)
            else:
                # Save the new data if no file exists
                df_new.to_csv(path)
            print(f'Ticker {ticker} data saved to {path}')
        
        except Exception as e:
            print(f'Error fetching data for {ticker}: {e}')
            return None
    else:
        print(f'Ticker {ticker} already up-to-date')

    return ticker

# Example usage
download_and_append_data('QQQ', '2024-01-01', '2024-06-01')


Ticker QQQ already up-to-date


'QQQ'

### Fully Integrated Pipeline
28SEP24

In [15]:
import databento as db
import pandas as pd
from datetime import timedelta
from pathlib import Path
import zipfile
import os

def get_data_from_databento(ticker, start_date, end_date):
    """
    Fetches OHLCV data for a given ticker from Data Bento for the specified date range.
    Args:
        ticker (str): The stock ticker symbol.
        start_date (datetime): The start date of the data to retrieve.
        end_date (datetime): The end date of the data to retrieve.
    
    Returns:
        pd.DataFrame: A DataFrame containing the retrieved data.
    """
    client = db.Historical(os.getenv('databento_api_key'))
    dataset = client.timeseries.get_range(
        dataset="XNAS.ITCH",
        symbols=ticker,
        start=start_date.strftime('%Y-%m-%d'),
        end=end_date.strftime('%Y-%m-%d'),
        schema='ohlcv-1d'
    )
    
    df = dataset.to_df()
    return df

def convert_to_lean_format(csv_file, ticker, frequency='daily'):
    """
    Converts a CSV file containing stock data into a format compatible with LEAN Local CLI Framework.
    Args:
        csv_file (str): The path to the input CSV file containing stock data.
        ticker (str): The stock ticker symbol.
        frequency (str, optional): The frequency of the data. Can be 'daily', 'hourly', or 'minute'. Defaults to 'daily'.
    Returns:
        None: The function saves the converted data to a file in the appropriate directory based on the frequency. Located within project directory.
    """
    
    df = pd.read_csv(csv_file)
    df['date'] = pd.to_datetime(df.pop('ts_event')).dt.strftime('%Y%m%d %H:%M')
    df = df[['date', 'open', 'high', 'low', 'close', 'volume']]
    
    # Conversion from dollars to deci-cents
    df[['open', 'high', 'low', 'close']] = (df[['open', 'high', 'low', 'close']] * 10000).astype(int)
    
    # Data type sorting for directory saving for csv
    if frequency == 'daily':
        output_dir = 'data/equity/usa/daily/'
        output_file = f"{output_dir}{ticker.lower()}.csv"
    elif frequency == 'hourly':
        output_dir = 'data/equity/usa/hourly/'
        output_file = f"{output_dir}{ticker.lower()}.csv"
    else:
        output_dir = 'data/equity/usa/minute/'
        output_file = f"{output_dir}{(ticker.lower())}.csv"
    
    df.to_csv(output_file, index=False, header=False)
    
    # Zip File
    zip_file = os.path.join(output_dir, f'{ticker.lower()}.zip')
    
    # Create a zip file and add the csv file to it
    with zipfile.ZipFile(zip_file, 'w') as zf:
        zf.write(output_file, arcname=f'{ticker.lower()}.csv')

    print(f"{output_file} has been successfully zipped into {zip_file}.")
    
def download_and_append_data(ticker, start_date, end_date, folder='databento/downloads', frequency='daily'):
    """
    Downloads and appends stock data from Data Bento API if necessary, then converts the data to QuantConnect format.
    Args:
        ticker (str): The stock ticker symbol.
        start_date (str): The start date for data retrieval.
        end_date (str): The end date for data retrieval.
        folder (str): Folder to save the CSV file.
        frequency (str): Data frequency ('daily', 'hourly', 'minute').
    
    Returns:
        str: The ticker symbol.
    """
    # Define file path for the ticker - Saves the databento download file
    #fname = ticker.lower() + '.csv'
    fname = f'{ticker}_data.csv'
    path = Path(folder) / fname

    # Convert start_date and end_date to datetime objects
    start_date = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)

    # Check if the file exists
    if path.exists():
        # Open the file and retrieve existing dates
        df_existing = pd.read_csv(path, index_col=0)
        df_existing.index = pd.to_datetime(df_existing.index)  # Ensure the index is converted to Timestamps
        dates = pd.DatetimeIndex(df_existing.index.sort_values(ascending=True))
        
        # Convert the existing dates to tz-naive (remove timezone)
        dates = dates.tz_localize(None)
    else:
        dates = None

    # If no file or date range is not covered
    if dates is None or start_date < dates[0] or end_date > dates[-1]:
        print(f'Fetching data for {ticker} from {start_date} to {end_date}')
        
        # Attempt to fetch new data from Data Bento
        try:
            # Add buffer (delta) to the date range to handle overlaps
            delta = timedelta(days=3)
            df_new = get_data_from_databento(ticker, start_date - delta, end_date + delta)
                        
            # If file exists, append new data
            if path.exists():
                df_combined = pd.concat([df_existing, df_new]).drop_duplicates()
                df_combined.sort_index(inplace=True)
                df_combined.to_csv(path)
            else:
                # Save the new data if no file exists
                df_new.to_csv(path)
            print(f'Ticker {ticker} data saved to {path}')
        
        except Exception as e:
            print(f'Error fetching data for {ticker}: {e}')
            return None
    else:
        print(f'Ticker {ticker} already up-to-date')

    # Convert to QuantConnect format (path = path to csv file incl. .csv, ticker = ticker symbol, frequency = 'daily', 'hourly', 'minute')
    convert_to_lean_format(path, ticker, frequency)

    return ticker

# Example usage
download_and_append_data('QQQ', '2022-12-01', '2022-12-01')


Fetching data for QQQ from 2022-12-01 00:00:00 to 2022-12-01 00:00:00
Ticker QQQ data saved to databento\downloads\QQQ_data.csv
data/equity/usa/daily/qqq.csv has been successfully zipped into data/equity/usa/daily/qqq.zip.


'QQQ'

Working, now next test in QC localdatatestspy
Also figure out why the file gets saved to databento\downloads AND the target folder

In [17]:
# testing convert_utc_to_ny time
def convert_utc_to_ny(df, datetime_column='ts_event'):
    """
    Convert a datetime column from UTC to New York time (Eastern Time).
    
    Args:
        df (pd.DataFrame): The DataFrame containing the datetime column in UTC.
        datetime_column (str): The name of the datetime column to be converted. Defaults to 'ts_event'.
    
    Returns:
        pd.DataFrame: The DataFrame with the converted New York time column.
    """
    # Ensure the datetime column is in UTC timezone
    df[datetime_column] = pd.to_datetime(df[datetime_column])
    try:
        df[datetime_column] = df[datetime_column].dt.tz_localize('UTC')  # Localize to UTC
    except:
        # Do nothing, we want it to convert first
        pass
    # Convert the UTC time to New York time (Eastern Time)
    df[datetime_column] = df[datetime_column].dt.tz_convert('America/New_York')
    
    return df

df = pd.read_csv('databento/downloads/QQQ_data.csv')
df_ny = convert_utc_to_ny(df)

In [18]:
df_ny.head()

Unnamed: 0,ts_event,rtype,publisher_id,instrument_id,open,high,low,close,volume,symbol
0,2022-11-29 19:00:00-05:00,35,2,8933,281.65,294.2,279.97,294.0,13716247.0,QQQ
1,2022-11-30 19:00:00-05:00,35,2,8929,293.0,295.75,290.88,293.24,8271505.0,QQQ
2,2022-12-01 19:00:00-05:00,35,2,8919,293.43,293.94,286.0,291.94,9893054.0,QQQ
3,2022-12-04 19:00:00-05:00,35,2,8916,292.04,292.26,286.15,288.02,6803353.0,QQQ
4,2022-12-05 19:00:00-05:00,35,2,8915,288.28,288.58,280.26,282.15,9180658.0,QQQ


In [1]:
import os
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError

def upload_to_postgresql(df, ticker, schema='databento_ohlcv'):
    """
    Uploads a DataFrame to a PostgreSQL database using SQLAlchemy.
    
    Args:
        df (pd.DataFrame): The DataFrame containing OHLCV data.
        ticker (str): The stock ticker symbol (used to name the table).
        schema (str): The schema where the table should be created. Default is 'databento_ohlcv'.
    
    Returns:
        None
    """
    # Fetch credentials from environment variables
    pguser = os.getenv('pguser')
    pgpass = os.getenv('pgpass')
    pghost = os.getenv('pghost')
    
    # Database connection URL using environment variables
    db_url = f'postgresql://{pguser}:{pgpass}@{pghost}/FinancialData'
    engine = create_engine(db_url)

    try:
        # Write the DataFrame to the PostgreSQL table
        df.to_sql(ticker, engine, schema=schema, if_exists='replace', index=False)
        print(f"Data for {ticker} uploaded successfully to {schema}.{ticker}.")
    
    except SQLAlchemyError as e:
        print(f"Error uploading data for {ticker} to PostgreSQL: {e}")
    
    finally:
        engine.dispose()

In [3]:
import pandas as pd
df = pd.read_csv('databento/downloads/QQQ_data.csv')
df.head(5)

Unnamed: 0,ts_event,rtype,publisher_id,instrument_id,open,high,low,close,volume,symbol
0,2022-11-30 00:00:00+00:00,35,2,8933,281.65,294.2,279.97,294.0,13716247.0,QQQ
1,2022-12-01 00:00:00+00:00,35,2,8929,293.0,295.75,290.88,293.24,8271505.0,QQQ
2,2022-12-02 00:00:00+00:00,35,2,8919,293.43,293.94,286.0,291.94,9893054.0,QQQ
3,2022-12-05 00:00:00+00:00,35,2,8916,292.04,292.26,286.15,288.02,6803353.0,QQQ
4,2022-12-06 00:00:00+00:00,35,2,8915,288.28,288.58,280.26,282.15,9180658.0,QQQ


In [4]:
upload_to_postgresql(df, 'QQQ', schema='databento_ohlcv')

Data for QQQ uploaded successfully to databento_ohlcv.QQQ.
