In [3]:
import os
import requests
import yfinance as yf
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime

# Database Configuration
DB_URL = "postgresql://user:password@192.168.0.34:5432/rag_db"
TABLE_NAME = "all_market_data"
OUTPUT_CSV = "market_data.csv"

# Retrieve environment variables for Alpaca API
API_KEY = os.getenv('APCA_API_KEY_ID')
SECRET_KEY = os.getenv('APCA_API_SECRET_KEY')
BASE_URL = os.getenv('APCA_API_BASE_URL')

# Alpaca API Endpoint for Stock Assets
ALPACA_ENDPOINT = f"{BASE_URL}/v2/assets"

# Headers for authentication
HEADERS = {
    "APCA-API-KEY-ID": API_KEY,
    "APCA-API-SECRET-KEY": SECRET_KEY
}

# Ensure database table exists
def ensure_table_exists():
    try:
        connection = psycopg2.connect(DB_URL)
        cursor = connection.cursor()
        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
            date DATE,
            open NUMERIC,
            high NUMERIC,
            low NUMERIC,
            close NUMERIC,
            adj_close NUMERIC,
            volume BIGINT,
            ticker VARCHAR(20)
        )
        """
        cursor.execute(create_table_query)
        connection.commit()
        cursor.close()
        connection.close()
        print(f"Table {TABLE_NAME} ensured to exist.")
    except Exception as e:
        print(f"Error ensuring table exists: {e}")

# Fetch all stock tickers from Alpaca
def get_alpaca_tickers(status="active", asset_class="us_equity"):
    """
    Fetches all stock tickers from Alpaca API.
    Filters only active U.S. equities.
    """
    try:
        response = requests.get(ALPACA_ENDPOINT, headers=HEADERS)
        if response.status_code != 200:
            print(f"Error: {response.status_code}, {response.text}")
            return []

        assets = response.json()

        # Extract only active US equity tickers
        stock_tickers = [
            asset["symbol"] for asset in assets
            if asset["status"] == status and asset["asset_class"] == asset_class
        ]

        return stock_tickers

    except Exception as e:
        print(f"Error fetching tickers from Alpaca: {e}")
        return []

# Function to fetch data from yfinance
def fetch_data(ticker):
    print(f"Fetching data for {ticker}...")
    try:
        # Fetch market data (last 2 days)
        market_data = yf.download(ticker, period="2d", progress=False)
        market_data.reset_index(inplace=True)
        market_data["Ticker"] = ticker

        # Ensure 'Adj Close' column exists
        if "Adj Close" not in market_data.columns:
            market_data["Adj Close"] = market_data["Close"]

        # Keep only required columns
        market_data = market_data[["Date", "Open", "High", "Low", "Close", "Adj Close", "Volume", "Ticker"]]
        print(f"Data for {ticker} fetched successfully.")
        return market_data
    except Exception as e:
        print(f"Error fetching data for {ticker}: {e}")
        return pd.DataFrame()

# Save data to CSV file
def save_to_csv(dataframes):
    try:
        full_data = pd.concat(dataframes, ignore_index=True)
        full_data.to_csv(OUTPUT_CSV, index=False)
        print(f"Data saved to {OUTPUT_CSV}")
    except Exception as e:
        print(f"Error saving to CSV: {e}")

# Save data to PostgreSQL
def save_to_database(dataframes):
    try:
        connection = psycopg2.connect(DB_URL)
        cursor = connection.cursor()

        for df in dataframes:
            df['Date'] = pd.to_datetime(df['Date']).dt.date  # Convert date format

            # Convert all numeric columns to Python types
            for col in df.select_dtypes(include=['number']).columns:
                df[col] = df[col].astype(object)

            # Convert DataFrame to records for insertion
            records = df.to_records(index=False)

            # Insert data into PostgreSQL
            execute_values(
                cursor,
                f"""
                INSERT INTO {TABLE_NAME} (date, open, high, low, close, adj_close, volume, ticker)
                VALUES %s
                """,
                records
            )

        connection.commit()
        cursor.close()
        connection.close()
        print(f"Data saved to database table {TABLE_NAME}")
    except Exception as e:
        print(f"Error saving to database: {e}")

# Main function to fetch and save market data
def main():
    ensure_table_exists()

    # Get all active U.S. stock tickers from Alpaca
    stock_tickers = get_alpaca_tickers()

    # Add cryptocurrency tickers
    crypto_tickers = ['BTC-USD', 'ETH-USD', 'ADA-USD', 'DOGE-USD', 'SOL-USD']
    
    all_tickers = stock_tickers + crypto_tickers
    dataframes = []

    for ticker in all_tickers:
        df = fetch_data(ticker)
        if not df.empty:
            dataframes.append(df)

    if dataframes:
        save_to_csv(dataframes)
        save_to_database(dataframes)
        print("All data fetched, saved to CSV, and stored in the database successfully.")
    else:
        print("No data fetched. Exiting.")

if __name__ == "__main__":
    main()


Table all_market_data ensured to exist.
Error fetching tickers from Alpaca: Invalid URL 'None/v2/assets': No scheme supplied. Perhaps you meant https://None/v2/assets?
Fetching data for BTC-USD...
Data for BTC-USD fetched successfully.
Fetching data for ETH-USD...
Data for ETH-USD fetched successfully.
Fetching data for ADA-USD...
Data for ADA-USD fetched successfully.
Fetching data for DOGE-USD...
Data for DOGE-USD fetched successfully.
Fetching data for SOL-USD...
Data for SOL-USD fetched successfully.
Data saved to market_data.csv
Data saved to database table all_market_data
All data fetched, saved to CSV, and stored in the database successfully.
