In [None]:
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from time import sleep
import requests
from datetime import datetime, timedelta
import ccxt
import os

# MySQL connection details
user = os.getenv('MYSQL_USER')
password = os.getenv('MYSQL_PASSWORD')
host = 'mysql'  # This should be the name of the MySQL service/container
database = 'crypto_data'

# Create a connection string
connection_string = f'mysql+mysqlconnector://{user}:{password}@{host}/{database}'

# Create a SQLAlchemy engine
engine = create_engine(connection_string)

# Create a configured "Session" class
Session = sessionmaker(bind=engine)

# Create a session
session = Session()
try:
    # Example of executing a query
    query = text("SELECT datetime FROM BTCUSD ORDER BY datetime DESC LIMIT 1;")
    result = session.execute(query)
    
    # Display the DataFrame
    last_date = result.fetchall()[0][0]
    print(f"Last date: {last_date}")

except SQLAlchemyError as e:
    # Handle SQLAlchemy-specific errors
    print(f"An error occurred: {e}")
    session.rollback()  # Rollback any changes if an error occurred

except Exception as e:
    # Handle other possible exceptions
    print(f"An unexpected error occurred: {e}")


finally:
    session.close()
    # Dispose of the engine to close all pooled connections
    engine.dispose()

request_date = last_date + timedelta(minutes=1)

Last date: 2024-09-04 00:28:00


In [None]:
def get_coinapi(currency, request_date):
    # Replace with your CoinAPI API key
    api_key = os.getenv('COIN_API_KEY')
    
    # Define the symbol and exchange
    #currency = 'BTC_USD'
    exchange_id = 'KRAKEN'  # Example: Bitstamp
    
    # Define the URL for fetching OHLCV data
    url = f'https://rest.coinapi.io/v1/ohlcv/{exchange_id}_SPOT_{currency}/history'
    
    # Define headers with the API key
    headers = {
        'X-CoinAPI-Key': api_key,
        'Accept': 'application/json'
    }
    
    # Define the parameters for the 1-minute interval data
    params = {
        'period_id': '1MIN',   # 1-minute interval
        'time_start': request_date.isoformat(),  # Start date in ISO 8601 
        'time_end': datetime.now().replace(second=0, microsecond=0).isoformat(),
        'limit': 100000          # Maximum number of records per page
    }
    
    # Make the request
    response = requests.get(url, headers=headers, params=params)
    
    # Check if the request was successful
    if response.status_code == 200:
        ohlcv_data = response.json()
    
        # Convert to DataFrame
        df = pd.DataFrame(ohlcv_data)
        
        # Rename the columns for clarity
        df.rename(columns={
            'time_period_start': 'datetime',
            'price_open': 'open',
            'price_high': 'high',
            'price_low': 'low',
            'price_close': 'close',
            'volume_traded': 'volume',
            'trades_count': 'trades'
        }, inplace=True)
        
        # Convert timestamp to datetime
        df['datetime'] = pd.to_datetime(df['datetime'])
        df['datetime'] = df['datetime'].dt.strftime('%Y-%m-%d %H:%M:%S')
        
        df.drop(['time_period_end','time_open','time_close'], axis=1, inplace=True)
    else:
        print(f"Failed to fetch data: {response.status_code}, {response.text}")

    return df

Failed to fetch data: 429, {
  "title": "Forbidden",
  "status": 429,
  "detail": "Quota exceeded: REST API Requests per day.",
  "error": "Forbidden (Quota exceeded: REST API Requests per day.)",
  "QuotaKey": "CAPI_DTA_REST_REQ_QUOTA_DAILY",
  "QuotaName": "REST API Requests per day",
  "QuotaType": "Quota",
  "QuotaValueCurrentUsage": 981,
  "QuotaValue": 100,
  "QuotaValueUnit": "Credits",
  "QuotaValueAdjustable": "Yes, upgrade subscription or enable pay-as-you-go billing above the organizaiton quota."
}


In [107]:
request_date

datetime.datetime(2024, 3, 10, 10, 40)

In [229]:
def get_data(currency, request_date):

    # Initialize the Binance exchange
    exchange = ccxt.binance()
    # kucoin = ccxt.kucoin()
    
    # Define the symbol and timeframe
    timeframe = '1m'  # 1-minute interval
    
    # Convert the start date to seconds since the epoch
    print(f"Requesting ohlcv - Tool: {exchange.name} Currency: {currency} Timeframe: {timeframe}")
    result = []
    since = int(request_date.timestamp())*1000
    while True:
        print(f'Collecting from: {datetime.fromtimestamp(since/1000)}')
        try:
            ohlcv_response = exchange.fetch_ohlcv(currency, timeframe, since, limit=1000)
            if len(ohlcv_response) == 0:
                break
            else:
                result = result + ohlcv_response
                since = ohlcv_response[-1][0] + 60000 # add a minute to the last collected datetime
        except Exception as e:
            print(f"Error occurred while fetching ohlcv data from {exchange.name}: {e}")
    
    return result

currencies = ['BTC/USDT','ETH/USDT','ETC/USDT','XRP/USDT','ADA/USDT','SOL/USDT','DOGE/USDT','MATIC/USDT','LINK/USDT','LTC/USDT']

result = get_data(request_date)
df = pd.DataFrame(result, columns=['datetime', 'open', 'high', 'low', 'close', 'volume'])
# Convert timestamp to a readable datetime format
df['datetime'] = pd.to_datetime(df['datetime'], unit='ms')
df['trades'] = None

Unnamed: 0,datetime,open,high,low,close,volume,trades
0,2024-03-10 10:40:00,69740.01,69756.18,69730.00,69737.57,16.39132,
1,2024-03-10 10:41:00,69737.57,69761.05,69681.10,69686.01,27.83064,
2,2024-03-10 10:42:00,69686.01,69706.78,69601.10,69650.16,40.63959,
3,2024-03-10 10:43:00,69650.15,69680.00,69622.21,69668.00,40.24856,
4,2024-03-10 10:44:00,69668.00,69710.00,69579.77,69648.06,59.16559,
...,...,...,...,...,...,...,...
255704,2024-09-04 00:24:00,57910.00,57931.82,57909.99,57929.01,8.33225,
255705,2024-09-04 00:25:00,57929.01,57929.01,57890.01,57890.01,7.75151,
255706,2024-09-04 00:26:00,57890.01,57890.01,57868.22,57868.22,15.58424,
255707,2024-09-04 00:27:00,57868.22,57879.00,57824.00,57824.01,22.36212,


In [235]:
# Function to insert data into MySQL table using batch inserts with chunking
def insert_data_to_db(df, table_name, chunk_size=1000):
    try:
        if not df.empty:
            # Convert datetime column to string format
            df['datetime'] = df['datetime'].astype(str)
            
            # Define the SQL query with placeholders
            insert_query = f"""
            INSERT IGNORE INTO {table_name} (datetime, open, high, low, close, volume, trades)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
            """

            for i in range(0, len(df), chunk_size):
                chunk = df.iloc[i:i + chunk_size]
                data = chunk.values.tolist()
                
                cursor.executemany(insert_query, data)
            connection.commit()
    except Exception as e:
        connection.rollback()
        print(f"Failed to insert data into {table_name}: {e}")
        raise

In [None]:
# Establish the connection
connection = mysql.connector.connect(
    host='mysql',
    user=os.getenv('MYSQL_USER'),
    password=os.getenv('MYSQL_PASSWORD'),
    database='crypto_data'
)

# Create a cursor object
cursor = connection.cursor()

In [237]:
insert_data_to_db(df, 'BTCUSD')

# Close the cursor and connection
cursor.close()
connection.close()