In [1]:
# Cell 1: Imports and Setup
import requests
import json
from dotenv import load_dotenv
import os
import pandas as pd
from datetime import date, datetime, timedelta
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import seaborn as sns
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Load environment variables and setup device
load_dotenv()
ALPHA_VANTAGE_API_KEY = os.getenv('ALPHA_VANTAGE_API_KEY')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cpu


In [2]:
# Cell 2: Autoencoder Model and Training Functions
class BollingerAutoencoder(nn.Module):
    def __init__(self, input_dim, encoding_dim_factor=0.5):
        super().__init__()
        hidden_dim = int(input_dim * 0.75)
        bottleneck_dim = int(input_dim * encoding_dim_factor)
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, bottleneck_dim),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.Linear(bottleneck_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        return self.decoder(self.encoder(x))

def create_sequences(data, sequence_length):
    data_tensor = torch.FloatTensor(data.values)
    sequences = []
    for i in range(data_tensor.shape[1]):
        feature_sequences = data_tensor[:, i].unfold(0, sequence_length, 1)
        sequences.append(feature_sequences)
    return torch.stack(sequences, dim=2).numpy()

def train_autoencoder(model, train_data, test_data, epochs=100, batch_size=32):
    train_tensor = torch.FloatTensor(train_data).to(device)
    test_tensor = torch.FloatTensor(test_data).to(device)
    train_loader = DataLoader(TensorDataset(train_tensor, train_tensor), 
                            batch_size=batch_size, shuffle=True)
    
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for batch_x, _ in train_loader:
            outputs = model(batch_x)
            loss = criterion(outputs, batch_x)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(train_loader):.6f}')
    
    model.eval()
    with torch.no_grad():
        test_loss = criterion(model(test_tensor), test_tensor)
    print(f'Test Loss: {test_loss.item():.6f}')
    return model

In [None]:
# Cell 3: Data Processing and Visualization Functions
def fetch_and_process_bbands(params, interval_name):
    response = requests.get("https://www.alphavantage.co/query", params=params)
    df = pd.DataFrame.from_dict(response.json()['values']).astype(float)
    df.index = pd.to_datetime(df.index)
    return df.sort_index()

def visualize_bollinger_bands(df, interval_name):
    plt.figure(figsize=(12, 8))
    plt.plot(df['Real Middle Band'], label='Middle Band')
    plt.plot(df['Real Upper Band'], label='Upper Band')
    plt.plot(df['Real Lower Band'], label='Lower Band')
    plt.title(f'Bollinger Bands ({interval_name})')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(f'{SYMBOL}_bbands_{interval_name}.png')
    plt.show()

In [5]:
# Cell 4: Anomaly Detection Functions
def detect_anomalies(df, interval_name, encoding_dim_factor=0.5, threshold_factor=3.0):
    features = df[['Real Upper Band', 'Real Middle Band', 'Real Lower Band']]
    scaler = MinMaxScaler()
    data_scaled = scaler.fit_transform(features)
    
    train_data, test_data = train_test_split(data_scaled, test_size=0.2, random_state=42)
    model = BollingerAutoencoder(data_scaled.shape[1], encoding_dim_factor).to(device)
    model = train_autoencoder(model, train_data, test_data)
    
    with torch.no_grad():
        reconstructions = model(torch.FloatTensor(data_scaled).to(device)).cpu().numpy()
    mse = np.mean((data_scaled - reconstructions) ** 2, axis=1)
    threshold = np.mean(mse) + threshold_factor * np.std(mse)
    
    plt.figure(figsize=(12, 6))
    plt.plot(mse)
    plt.axhline(threshold, color='r', linestyle='--')
    plt.title(f'Reconstruction Error ({interval_name})')
    plt.show()
    
    return mse > threshold

In [6]:
# Cell 5: Main Execution
if __name__ == "__main__":
    SYMBOL = 'AAPL'
    params_5min = {
        "function": "BBANDS",
        "symbol": SYMBOL,
        "interval": "5min",
        "time_period": "20",
        "series_type": "close",
        "apikey": ALPHA_VANTAGE_API_KEY
    }
    
    params_1hour = {**params_5min, "interval": "60min"}
    
    # Fetch and analyze data
    for interval, params in [("5-minute", params_5min), ("1-hour", params_1hour)]:
        df = fetch_and_process_bbands(params, interval)
        visualize_bollinger_bands(df, interval)
        anomalies = detect_anomalies(df, interval)
        print(f"Detected {sum(anomalies)} anomalies in {interval} data")

KeyError: 'values'