In [7]:
import os
import requests
from datetime import datetime
from dotenv import load_dotenv
import csv
import pytz

load_dotenv(dotenv_path="../../.envpy")

ALPHA_VANTAGE_API_KEY = os.getenv("ALPHA_VANTAGE_API_KEY")
CSV_OUTPUT_PATH = os.getenv("ALPHA_VANTAGE_CSVDIR")

STOCK_TICKER = "nvda"

now = datetime.now()
timestamp_str = now.strftime("%Y-%m-%d_%H.%M.%S")
csv_filename = f"{CSV_OUTPUT_PATH}/{STOCK_TICKER}_download_{timestamp_str}.csv"

DECI_CENTS_CONVERSION_FACTOR = 10_000


In [8]:

base_url = "https://www.alphavantage.co/query"
params = {
    "function": "TIME_SERIES_INTRADAY",
    "symbol": STOCK_TICKER,
    "interval": "1min",
    "outputsize": "full", # Full means we get around 1 month of data
    "datatype": "csv",  # can also be "json"
    "apikey": ALPHA_VANTAGE_API_KEY
}

response = requests.get(base_url, params=params)
response.raise_for_status()  # Ensure we got a successful response

# Save to CSV file
with open(csv_filename, "w", newline="") as f:
    f.write(response.text)

In [10]:
import io
import zipfile

# Output folder where LEAN engine expects the data
output_dir = f"../../data/equity/usa/minute/{STOCK_TICKER}"

# Make sure the output directory exists
os.makedirs(output_dir, exist_ok=True)

# Timezone objects
ny_tz = pytz.timezone("America/New_York")

with open(csv_filename, "r") as f:
    reader = csv.reader(f)
    header = next(reader)  # skip the header row: ["timestamp","open","high","low","close","volume"]

    # We'll keep a dictionary { date_str: list_of_rows }
    # where date_str is "20230406", etc.
    daily_data = {}

    for row in reader:
        # row e.g. ["2023-04-06 09:31:00", "290.43", "291.00", "290.10", "290.98", "15000"]

        timestamp_str = row[0]  # "2023-04-06 09:31:00"
        open_price = float(row[1])
        high_price = float(row[2])
        low_price = float(row[3])
        close_price = float(row[4])
        volume = int(row[5])

        # Parse the timestamp; Alpha Vantage is typically in Eastern or sometimes UTC
        # Check documentation or your specific usage. 
        # If it's UTC, you'd convert to Eastern. If it's already Eastern, parse as below:
        dt_naive = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")
        # dt_local = ny_tz.localize(dt_naive)  # If the CSV times are "local" and have no TZ
        # If CSV is in UTC, you'd do something like:
        # dt_utc = pytz.utc.localize(dt_naive)
        # dt_local = dt_utc.astimezone(ny_tz)
        # But let's assume it's already local ET in this example:
        dt_local = ny_tz.localize(dt_naive)

        # Compute the date string for the file name (YYYYMMDD)
        date_str = dt_local.strftime("%Y%m%d")

        # Compute milliseconds after midnight
        midnight = dt_local.replace(hour=0, minute=0, second=0, microsecond=0)
        millis_after_midnight = int((dt_local - midnight).total_seconds() * 1000)

        # Prepare LEAN-format line: time,open,high,low,close,volume
        lean_row = [
            str(millis_after_midnight),
            f"{open_price * DECI_CENTS_CONVERSION_FACTOR:.0f}",
            f"{high_price * DECI_CENTS_CONVERSION_FACTOR:.0f}",
            f"{low_price * DECI_CENTS_CONVERSION_FACTOR:.0f}",
            f"{close_price * DECI_CENTS_CONVERSION_FACTOR:.0f}",
            str(volume)
        ]

        if date_str not in daily_data:
            daily_data[date_str] = []
        daily_data[date_str].append(lean_row)

# Now, write each date's data to a CSV: YYYYMMDD_trade.csv
# --- Create one ZIP per trading day -------------------------------
for date_str, rows in daily_data.items():
    # Sort intraday rows chronologically
    rows.sort(key=lambda x: int(x[0]))

    csv_filename_inside_zip = f"{date_str}_{STOCK_TICKER}_minute_trade.csv"   # file *name* inside the archive
    zip_filename          = f"{date_str}_trade.zip"     # archive name on disk
    zip_path              = os.path.join(output_dir, zip_filename)

    # Build the CSV content in memory
    csv_buf = io.StringIO()
    writer  = csv.writer(csv_buf)
    for r in rows:
        writer.writerow(r)

    # Write the CSV content into the ZIP (no temp file needed)
    with zipfile.ZipFile(zip_path, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
        zf.writestr(csv_filename_inside_zip, csv_buf.getvalue())

print("Conversion & zipping complete.")

Conversion & zipping complete.
