In [None]:
# Load environment variables
import os
from dotenv import load_dotenv

# Get the directory of this notebook
notebook_dir = os.path.dirname(os.path.abspath(__file__)) if '__file__' in locals() else os.getcwd()

# Load .env file
env_path = os.path.join(notebook_dir, '.env')
load_dotenv(env_path)

# Verify loaded
print(f"✓ Environment variables loaded from: {env_path}")

In [None]:
# Setup and imports
import sys
import os

# Get relative paths
notebook_dir = os.path.dirname(os.path.abspath(__file__)) if '__file__' in locals() else os.getcwd()
project_root = os.path.abspath(os.path.join(notebook_dir, '..'))

# Install FinRL package
%pip install -e {project_root} -q

# Add to path
if project_root not in sys.path:
    sys.path.insert(0, project_root)

import finrl
print(f'Using finrl from: {os.path.dirname(finrl.__file__)}')

In [None]:
# Import required libraries
from finrl.config_tickers import DOW_30_TICKER
from finrl.config import INDICATORS
from finrl.meta.env_stock_trading.env_stocktrading_np import StockTradingEnv

import numpy as np
import pandas as pd
import time
from datetime import datetime, timedelta

import yfinance as yf
from finrl.meta.preprocessor.yahoodownloader import YahooDownloader
from finrl.meta.preprocessor.preprocessors import FeatureEngineer, data_split
from finrl import config_tickers
from finrl.config import INDICATORS
from finrl.config import *
import itertools

from finrl.meta.data_processor import DataProcessor
from finrl.meta.data_processors.processor_alpaca import AlpacaProcessor

print("✓ All libraries imported successfully")

In [None]:
# Load API credentials from environment
API_KEY = os.getenv('API_KEY')
API_SECRET = os.getenv('API_SECRET')
API_BASE_URL = os.getenv('API_BASE_URL')
data_url = os.getenv('DATA_URL')

# Configuration from environment
API_RETRY_WAIT = int(os.getenv('API_RETRY_WAIT', '120'))
API_REQUEST_SLEEP = int(os.getenv('API_REQUEST_SLEEP', '50'))

# File paths (relative to notebook directory)
BUFFER_CSV_PATH = os.path.join(notebook_dir, os.getenv('BUFFER_CSV_PATH', 'buffer.csv'))
OUTPUT_CSV_PATH = os.path.join(notebook_dir, os.getenv('OUTPUT_CSV_PATH', 'ou.csv'))
AFTER_CLEAN_DATA_CSV = os.path.join(notebook_dir, os.getenv('AFTER_CLEAN_DATA_CSV', 'after_clean_data.csv'))
TRAIN_DATA_CSV = os.path.join(notebook_dir, os.getenv('TRAIN_DATA_CSV', 'train_data.csv'))
TRADE_DATA_CSV = os.path.join(notebook_dir, os.getenv('TRADE_DATA_CSV', 'trade_data.csv'))

print("✓ Configuration loaded")
print(f"  API Base URL: {API_BASE_URL}")
print(f"  Data output path: {OUTPUT_CSV_PATH}")

In [None]:
# Initialize Data Processor
DP = DataProcessor(
    data_source='alpaca',
    API_KEY=API_KEY, 
    API_SECRET=API_SECRET, 
    API_BASE_URL=API_BASE_URL
)

print("✓ DataProcessor initialized")

## 1. Data Download

Download minute-level data from Alpaca API for Dow 30 stocks (excluding WBA and TRV).

In [None]:
# Define ticker list (Dow 30, excluding problematic tickers)
ticker_list = [
    "AXP", "AMGN", "AAPL", "BA", "CAT", "CSCO", "CVX", "GS", "HD", "HON",
    "IBM", "INTC", "JNJ", "KO", "JPM", "MCD", "MMM", "MRK", "MSFT", "NKE",
    "PG", "UNH", "V", "VZ", "WMT", "DIS", "DOW", "CRM"
]

# Months to download (adjust as needed)
MONTHS_NEEDING_DOWNLOAD = [
    ('2024-07-01', '2024-08-01'),
    ('2024-08-02', '2024-09-02'),
    ('2024-09-03', '2024-10-03'),
    ('2024-10-04', '2024-11-04'),
    ('2024-11-05', '2024-12-05'),
    ('2024-12-06', '2025-01-06'),
    ('2025-01-07', '2025-02-07'),
    ('2025-02-08', '2025-03-08'),
    ('2025-03-09', '2025-04-09'),
    ('2025-04-10', '2025-05-10'),
    ('2025-05-11', '2025-06-11'),
    ('2025-06-12', '2025-07-12'),
    ('2025-07-13', '2025-08-13'),
    ('2025-08-14', '2025-09-14'),
    ('2025-09-15', '2025-10-15'),
    ('2025-10-16', '2025-11-09'),
]

print(f"✓ Configuration ready")
print(f"  Tickers: {len(ticker_list)}")
print(f"  Months to download: {len(MONTHS_NEEDING_DOWNLOAD)}")

In [None]:
# Load existing data if available
try:
    existing_df = pd.read_csv(BUFFER_CSV_PATH, index_col=0)
    existing_df['timestamp'] = pd.to_datetime(existing_df['timestamp'], utc=True)
    existing_df['timestamp'] = existing_df['timestamp'].dt.tz_localize(None)
    print(f"✓ Loaded existing data: {len(existing_df):,} records")
    print(f"  Date range: {existing_df['timestamp'].min()} to {existing_df['timestamp'].max()}")
except FileNotFoundError:
    existing_df = None
    print("⚠ No existing data file found. Will create new one.")

# Download data for each month
all_data_frames = [existing_df] if existing_df is not None else []
downloaded_count = 0
failed_count = 0

print("\n" + "=" * 80)
print("DOWNLOADING DATA")
print("=" * 80)

for start_date, end_date in MONTHS_NEEDING_DOWNLOAD:
    month_str = start_date[:7]
    print(f"\n[{month_str}] Downloading data from {start_date} to {end_date}...")
    
    retry_count = 0
    max_retries = 3
    success = False
    
    while retry_count < max_retries and not success:
        try:
            df_temp = DP.download_data(
                start_date=start_date,
                end_date=end_date,
                ticker_list=ticker_list,
                time_interval='1Min'
            )
            
            if df_temp is not None and len(df_temp) > 0:
                all_data_frames.append(df_temp)
                downloaded_count += 1
                print(f"✓ {month_str}: Downloaded {len(df_temp):,} records")
                success = True
            else:
                print(f"✗ {month_str}: No data returned")
                failed_count += 1
                success = True
                
        except Exception as e:
            retry_count += 1
            if retry_count < max_retries:
                print(f"✗ {month_str}: API Error - {str(e)}")
                print(f"  Retrying in {API_RETRY_WAIT} seconds... (Attempt {retry_count}/{max_retries})")
                time.sleep(API_RETRY_WAIT)
            else:
                print(f"✗ {month_str}: Failed after {max_retries} attempts")
                failed_count += 1
                success = True
    
    # Sleep between requests
    if start_date != MONTHS_NEEDING_DOWNLOAD[-1][0]:
        time.sleep(API_REQUEST_SLEEP)

print("\n" + "=" * 80)
print(f"Download Summary:")
print(f"  Successfully downloaded: {downloaded_count} months")
print(f"  Failed: {failed_count} months")

In [None]:
# Combine all data
print("\n" + "=" * 80)
print("COMBINING AND CLEANING DATA")
print("=" * 80)

if len(all_data_frames) > 0:
    df_raw = pd.concat(all_data_frames, ignore_index=True)
    print(f"✓ Combined dataframe: {len(df_raw):,} total records")

    # Rename if needed
    if 'date' in df_raw.columns:
        df_raw.rename(columns={'date': 'timestamp'}, inplace=True)

    # Convert timestamps
    df_raw['timestamp'] = pd.to_datetime(df_raw['timestamp'], utc=True)
    df_raw['timestamp'] = df_raw['timestamp'].dt.tz_localize(None)

    # Remove duplicates (keep latest)
    df_raw = df_raw.sort_values('timestamp').drop_duplicates(subset=['timestamp', 'tic'], keep='last')
    print(f"✓ After removing duplicates: {len(df_raw):,} records")
    
    # Save to CSV
    df_raw.to_csv(BUFFER_CSV_PATH)
    print(f"✓ Saved raw data to: {os.path.basename(BUFFER_CSV_PATH)}")
    
    print(f"\n  Date range: {df_raw['timestamp'].min()} to {df_raw['timestamp'].max()}")
    print(f"  Unique tickers: {df_raw['tic'].nunique()}")
else:
    print("⚠ No data downloaded.")

## 2. Data Verification and Quality Analysis

In [None]:
# Verify and analyze the downloaded data
print("=" * 80)
print("DATA VERIFICATION AND QUALITY ANALYSIS")
print("=" * 80)

# Check basic statistics
print(f"\nTotal Records: {len(df_raw):,}")
print(f"Date Range: {df_raw['timestamp'].min()} to {df_raw['timestamp'].max()}")
print(f"Unique Tickers: {df_raw['tic'].nunique()}")
print(f"Total Days: {(df_raw['timestamp'].max() - df_raw['timestamp'].min()).days}")

# Analyze by month
df_raw['year_month'] = pd.to_datetime(df_raw['timestamp']).dt.to_period('M')
records_per_month = df_raw.groupby(['year_month', 'tic']).size().reset_index(name='records')
pivot_table = records_per_month.pivot(index='year_month', columns='tic', values='records')

print("\n" + "=" * 80)
print("RECORDS PER MONTH")
print("=" * 80)
print(pivot_table.fillna(0).astype(int))

# Calculate completeness by month
expected_records = 390 * 21  # ~8,190 per ticker per month
print("\n" + "=" * 80)
print("COMPLETENESS BY MONTH")
print("=" * 80)

for month in pivot_table.index:
    month_data = pivot_table.loc[month]
    total_expected = expected_records * len(ticker_list)
    total_actual = month_data.sum()
    completeness_pct = (total_actual / total_expected) * 100
    
    status = "✓ COMPLETE" if completeness_pct >= 95 else "⚠ INCOMPLETE"
    print(f"{month}: {completeness_pct:6.1f}% ({int(total_actual):,} / {total_expected:,}) {status}")

# Clean up temporary column
df_raw = df_raw.drop('year_month', axis=1)

print("\n✓ Data verification complete")

## 3. Data Cleaning with AlpacaProcessor

In [None]:
# Initialize AlpacaProcessor for cleaning
alpaca = AlpacaProcessor(
    API_KEY=API_KEY,
    API_SECRET=API_SECRET,
    API_BASE_URL=API_BASE_URL
)

alpaca.start = "2024-07-01"
alpaca.end = "2025-11-09"
alpaca.time_interval = "1Min"

print("✓ AlpacaProcessor initialized")

In [None]:
# Force UTC timezone
df_raw["timestamp"] = pd.to_datetime(df_raw["timestamp"], utc=True)

# Clean the data
print("Cleaning data...")
processed_df = alpaca.clean_data(df_raw)

print(f"✓ Data cleaned")
print(f"  Records: {len(processed_df):,}")
print(f"  Columns: {list(processed_df.columns)}")

# Check for zeros
zero_summary = (processed_df == 0).sum().sort_values(ascending=False)
print(f"\nZeros per column:\n{zero_summary[zero_summary > 0]}")

In [None]:
# Sort and check for NaN values
processed_df = processed_df.sort_values(by=["timestamp", "tic"]).reset_index(drop=True)
print(f"✓ Data sorted by timestamp and ticker")
print(f"\nNaN values per column:")
print(processed_df.isna().sum()[processed_df.isna().sum() > 0])

## 4. Add Technical Indicators

In [None]:
# Add VIX
print("Adding VIX data...")
processed_df = alpaca.add_vix(processed_df)
print("✓ VIX data added")

In [None]:
# Add technical indicators
from finrl.config import INDICATORS

print(f"Adding technical indicators: {INDICATORS}")
processed_df["timestamp"] = pd.to_datetime(processed_df["timestamp"], utc=True)
processed_df_tech = alpaca.add_technical_indicator(processed_df, INDICATORS)

print(f"✓ Technical indicators added")
print(f"  Total columns: {len(processed_df_tech.columns)}")
print(f"  Sample columns: {list(processed_df_tech.columns[:10])}")

# Check for zeros
zero_summary = (processed_df_tech == 0).sum().sort_values(ascending=False)
print(f"\nZeros in technical indicators:\n{zero_summary[zero_summary > 0].head(10)}")

In [None]:
# Rename timestamp to date (for compatibility with FinRL)
processed_df_tech.rename(columns={'timestamp': 'date'}, inplace=True)
print("✓ Renamed 'timestamp' column to 'date'")

## 5. Train/Test Split

In [None]:
# Split data into training and trading periods
# Training: July 2024 - July 2025
# Trading: Aug 2025 - Nov 2025
train = data_split(processed_df_tech, '2024-07-01', '2025-07-31')
trade = data_split(processed_df_tech, '2025-08-01', '2025-11-09')

print("=" * 80)
print("DATA SPLIT")
print("=" * 80)
print(f"Training data size: {len(train):,}")
print(f"Trading data size: {len(trade):,}")

print("\nTraining date range:")
print(f"  Start: {train['date'].min()}")
print(f"  End: {train['date'].max()}")

print("\nTrading date range:")
print(f"  Start: {trade['date'].min()}")
print(f"  End: {trade['date'].max()}")

In [None]:
# Fill NaN values in technical indicators
print("\n" + "=" * 80)
print("HANDLING NaN VALUES")
print("=" * 80)

print("Before NaN handling:")
nan_counts = train.isna().sum()
print(f"  Total NaN values: {nan_counts.sum()}")
if nan_counts.sum() > 0:
    print(nan_counts[nan_counts > 0])

# Forward fill first, then backward fill
train = train.ffill().bfill()
trade = trade.ffill().bfill()

print("\nAfter NaN handling:")
print(f"  Training NaN values: {train.isna().sum().sum()}")
print(f"  Trading NaN values: {trade.isna().sum().sum()}")

# Verify no NaN values
assert not train.isna().any().any(), "Training data should not have NaN values"
assert not trade.isna().any().any(), "Trading data should not have NaN values"
print("\n✓ All NaN values handled successfully")

## 6. Save Processed Data

In [None]:
# Save processed data
train.to_csv(TRAIN_DATA_CSV)
trade.to_csv(TRADE_DATA_CSV)

# Also save the full processed data
processed_df_tech.to_csv(AFTER_CLEAN_DATA_CSV)
print(f"✓ Full processed data saved to: {os.path.basename(AFTER_CLEAN_DATA_CSV)}")