In [None]:
# 02_data_cleaning_preprocessing.py - Optimized Rate Limiting
# Axya Quant Platform - Notebook 02: Preprocessing

import pandas as pd
import numpy as np
from pathlib import Path
import requests
import time
from tqdm import tqdm
import logging
import sys
import json
from typing import List, Dict, Tuple

# --- Configuration ---
DATA_PATH = Path('./data')
RAW_PATH = DATA_PATH
PROCESSED_PATH = DATA_PATH / 'processed'
PROCESSED_PATH.mkdir(parents=True, exist_ok=True)

FMP_API_KEY = ''
FMP_BASE_URL = ''
MAX_DAILY_REQUESTS = 250
INITIAL_USED = 0  # Start from fresh count
REQUEST_INTERVAL = 1.2  # 1.2 seconds between requests (50/min)
MAX_RETRIES = 3
RETRY_BACKOFF = 5  # Seconds to wait after 429 error

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
    handlers=[
        logging.FileHandler(DATA_PATH / 'preprocessing.log'),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger("AxyaPreprocessor")

# --- Data Loading ---
def load_raw_data() -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Load and validate input data with type conversion"""
    logger.info("Loading raw data...")
    
    try:
        stock_data = pd.read_parquet(RAW_PATH / 'ohlcv.parquet').astype({
            'open': 'float32', 'high': 'float32', 
            'low': 'float32', 'close': 'float32',
            'volume': 'int32'
        })
        stock_data['date'] = pd.to_datetime(stock_data['date'])
        
        macro_data = pd.read_parquet(RAW_PATH / 'macro_data.parquet')
        macro_data['date'] = pd.to_datetime(macro_data['date'])

        return stock_data, macro_data
    except Exception as e:
        logger.error(f"Data loading failed: {str(e)}")
        raise

# --- Enhanced API Handler ---
class FMPRateController:
    """Advanced rate limiting with exponential backoff"""
    
    def __init__(self, initial_used: int = 0):
        self.used_credits = initial_used
        self.last_request = 0
        self.retries = 0
        
    def make_request(self, ticker: str) -> dict:
        """Safe API request with retry logic"""
        if self.used_credits >= MAX_DAILY_REQUESTS:
            return None

        cache_path = DATA_PATH / 'fmp_cache' / f"{ticker}_balance.json"
        if cache_path.exists():
            try:
                with open(cache_path) as f:
                    data = json.load(f)
                if self.validate_data(data):
                    return data
            except (json.JSONDecodeError, KeyError):
                pass

        # Rate limiting
        elapsed = time.time() - self.last_request
        if elapsed < REQUEST_INTERVAL:
            time.sleep(REQUEST_INTERVAL - elapsed)

        url = f"{FMP_BASE_URL}/balance-sheet-statement/{ticker}"
        params = {'period': 'annual', 'apikey': FMP_API_KEY}
        
        for attempt in range(MAX_RETRIES):
            try:
                response = requests.get(url, params=params, timeout=15)
                response.raise_for_status()
                
                data = response.json()
                if isinstance(data, list) and len(data) > 0:
                    valid_data = data[0] if self.validate_data(data[0]) else None
                    if valid_data:
                        self.used_credits += 1
                        self.last_request = time.time()
                        cache_path.parent.mkdir(exist_ok=True)
                        with open(cache_path, 'w') as f:
                            json.dump(valid_data, f)
                        return valid_data
                return None
                
            except requests.HTTPError as e:
                if response.status_code == 429:
                    wait_time = RETRY_BACKOFF * (2 ** attempt)
                    logger.warning(f"Rate limited: Retrying in {wait_time}s (Attempt {attempt+1})")
                    time.sleep(wait_time)
                    continue
                logger.error(f"HTTP error for {ticker}: {str(e)}")
                return None
            except Exception as e:
                logger.error(f"Request failed for {ticker}: {str(e)}")
                return None
                
        logger.error(f"Max retries reached for {ticker}")
        return None

    def validate_data(self, data: dict) -> bool:
        """Validate financial data structure"""
        required = {
            'symbol': str, 'date': str,
            'totalAssets': (int, float), 'totalDebt': (int, float),
            'cashAndShortTermInvestments': (int, float),
            'netReceivables': (int, float)
        }
        return all(
            isinstance(data.get(k), v) and (k != 'totalAssets' or data[k] > 0)
            for k, v in required.items()
        )

# --- Compliance Analysis ---
def analyze_compliance(tickers: List[str]) -> Tuple[Dict[str, bool], int]:
    """Run compliance checks with enhanced rate control"""
    handler = FMPRateController(initial_used=INITIAL_USED)
    results = {}
    
    with tqdm(total=len(tickers), desc="Compliance Analysis") as pbar:
        for ticker in tickers:
            if handler.used_credits >= MAX_DAILY_REQUESTS:
                logger.warning("Daily API limit reached - using cached data only")
                break

            data = handler.make_request(ticker)
            if data:
                try:
                    assets = float(data['totalAssets'])
                    debt_ratio = float(data['totalDebt']) / assets
                    receivables_ratio = float(data['netReceivables']) / assets
                    compliant = debt_ratio < 0.33 and receivables_ratio < 0.49
                except (ZeroDivisionError, KeyError):
                    compliant = False
            else:
                compliant = False
                
            results[ticker] = compliant
            pbar.update(1)
            pbar.set_postfix({
                'Compliant': sum(results.values()),
                'Used': handler.used_credits,
                'Remaining': MAX_DAILY_REQUESTS - handler.used_credits
            })
    
    return results, handler.used_credits

# --- Main Pipeline ---
def main():
    try:
        logger.info("=== Pipeline Initialized ===")
        logger.info(f"Starting API credits: {INITIAL_USED}/{MAX_DAILY_REQUESTS}")

        # Load and merge data
        stock_df, macro_df = load_raw_data()
        merged = pd.merge_asof(
            stock_df.sort_values('date'),
            macro_df.sort_values('date'),
            on='date',
            direction='nearest',
            tolerance=pd.Timedelta('3D')
        ).dropna(subset=['close', 'volume'])
        
        # Feature engineering
        merged['returns_1d'] = merged.groupby('ticker')['close'].pct_change()
        merged['volatility_5d'] = (
            merged.groupby('ticker')['returns_1d']
            .rolling(5, min_periods=3)
            .std()
            .reset_index(level=0, drop=True)
        )

        # Compliance analysis
        tickers = merged['ticker'].unique().tolist()
        logger.info(f"Processing {len(tickers)} tickers")
        compliance_map, used_credits = analyze_compliance(tickers)
        
        # Apply results
        merged['is_halal'] = (
            merged['ticker']
            .map(compliance_map)
            .fillna(False)
            .astype(bool)
        )
        
        # Save outputs
        merged.to_parquet(PROCESSED_PATH / 'full_dataset.parquet')
        if merged['is_halal'].any():
            merged[merged['is_halal']].to_parquet(PROCESSED_PATH / 'halal_instruments.parquet')
        
        logger.info(f"Final API usage: {used_credits}/{MAX_DAILY_REQUESTS}")
        logger.info(f"Compliance rate: {merged['is_halal'].mean():.2%}")
        logger.info("=== Pipeline Completed ===")

    except Exception as e:
        logger.critical(f"Pipeline failed: {str(e)}", exc_info=True)
        sys.exit(1)

if __name__ == '__main__':
    main()

2025-05-13 12:18:24,032 - INFO - AxyaPreprocessor - === Pipeline Initialized ===
2025-05-13 12:18:24,032 - INFO - AxyaPreprocessor - Starting API credits: 0/250
2025-05-13 12:18:24,033 - INFO - AxyaPreprocessor - Loading raw data...
2025-05-13 12:18:24,625 - INFO - AxyaPreprocessor - Processing 495 tickers


Compliance Analysis:  40%|▍| 197/495 [06:50<10:39,  2.15s/it, Compliant=134, Use