In [1]:
import os
import requests
import json
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
from requests import Response

os.chdir("..")
load_dotenv()

True

In [2]:
def format_bulk_stocks_eod(ticker: str, df: pd.DataFrame) -> bytes:
    today = datetime.now().strftime('%Y-%m-%d')
    index_name = f"quant-agents_stocks-eod_{today}"
    lines = []

    for _, row in df.iterrows():

        date_reference = row.get('timestamp')
        open_ = row.get('open')
        close = row.get('close')
        high = row.get('high')
        low = row.get('low')
        volume = row.get('volume')

        if open_ is None or close is None:
            continue
        id_str = f"{ticker}_{str(date_reference)}"

        meta = {"index": {"_index": index_name, "_id": id_str}}

        doc = {
            "key_ticker": ticker,
            "date_reference": date_reference,
            "val_open": float(open_),
            "val_close": float(close) if close is not None else None,
            "val_high": float(high) if high is not None else None,
            "val_low": float(low) if low is not None else None,
            "val_volume": int(volume) if volume is not None else None,
        }

        lines.append(json.dumps(meta))
        lines.append(json.dumps(doc))

    return (("\n".join(lines)) + "\n").encode("utf-8")

def ingest_stocks_eod(ticker: str) -> Response:
    es_url = os.environ.get('ELASTICSEARCH_URL')
    es_api_key = os.environ.get('ELASTICSEARCH_API_KEY')
    alpha_vantage_api_key = os.environ.get('ALPHAVANTAGE_API_KEY')
    alpha_vantage_time_series_url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={ticker}&apikey={alpha_vantage_api_key}&datatype=csv"

    ticker_daily_time_series = pd.read_csv(alpha_vantage_time_series_url)

    return requests.post(
        url=f"{es_url}/_bulk",
        headers={
            'Authorization': f'ApiKey {es_api_key}',
            'Content-Type': 'application/x-ndjson'
        },
        data=format_bulk_stocks_eod(ticker, ticker_daily_time_series)
    )


In [6]:
stocks_eod_response = ingest_stocks_eod("NVDA")
print(f"Stocks EOD ingestion complete with errors: {stocks_eod_response.json().get('errors')}")

Stocks EOD ingestion complete with errors: False


In [22]:
def format_bulk_stocks_insider_trades(ticker: str, df: pd.DataFrame) -> bytes:
    today = datetime.now().strftime('%Y-%m-%d')
    index_name = f"quant-agents_stocks-insider-trades_{today}"
    lines = []

    for _, row in df.iterrows():

        date_reference = row.get('transaction_date')
        executive = row.get('executive')
        executive_title = row.get('executive_title')
        acquisition_or_disposal = row.get('acquisition_or_disposal')
        shares= row.get('shares')
        share_price = row.get('share_price')

        if open_ is None or close is None:
            continue
        id_str = f"{ticker}_{str(date_reference)}"

        meta = {"index": {"_index": index_name, "_id": id_str}}

        doc = {
            "key_ticker": ticker,
            "date_reference": date_reference,
            "val_open": float(open_),
            "val_close": float(close) if close is not None else None,
            "val_high": float(high) if high is not None else None,
            "val_low": float(low) if low is not None else None,
            "val_volume": int(volume) if volume is not None else None,
        }

        lines.append(json.dumps(meta))
        lines.append(json.dumps(doc))

    return (("\n".join(lines)) + "\n").encode("utf-8")

def ingest_stocks_insider_trades(ticker: str) -> Response:
    es_url = os.environ.get('ELASTICSEARCH_URL')
    es_api_key = os.environ.get('ELASTICSEARCH_API_KEY')
    alpha_vantage_api_key = os.environ.get('ALPHAVANTAGE_API_KEY')
    alpha_vantage_insider_trades_url = f"https://www.alphavantage.co/query?function=INSIDER_TRANSACTIONS&symbol={ticker}&apikey={alpha_vantage_api_key}"

    ticker_insider_trades_data = requests.get(alpha_vantage_insider_trades_url).json()
    df = pd.json_normalize(ticker_insider_trades_data['data'])
    df["transaction_date"] = pd.to_datetime(df["transaction_date"], errors='coerce')
    cutoff = pd.Timestamp.now() - pd.Timedelta(days=100)
    ticker_recent_insider_trades = df[df["transaction_date"] >= cutoff].reset_index(drop=True)

    return requests.post(
        url=f"{es_url}/_bulk",
        headers={
            'Authorization': f'ApiKey {es_api_key}',
            'Content-Type': 'application/x-ndjson'
        },
        data=format_bulk_stocks_insider_trades(ticker, ticker_recent_insider_trades)
    )

Stocks insider trades ingestion complete with errors: False
