In [1]:
from vnstock import Quote
import pandas as pd
import time
from pathlib import Path
from collections import deque
from typing import List, Tuple

In [None]:
# Configuration
START_DATE = "2022-10-31"
END_DATE = "2025-10-31"
BATCH_SIZE = 30
MAX_RETRIES = 8
MAX_REQUEUES = 2

# File paths
SYMBOLS_CSV = Path("../../data/raw/vietnam_stock_symbols.csv")
OUTPUT_PATH = Path("../../data/raw/ta/vietnam_stock_price_history_2022-10-31_2025-10-31.csv")
FAILURES_PATH = Path("vietnam_stock_price_history_failures.csv")

# Error classification keywords
RETRIABLE_KEYWORDS = (
    "429", "too many requests", "timeout", "timed out", "temporarily blocked",
    "max retries", "failed to establish a new connection", "connection aborted",
    "connection reset", "read timed out", "temporarily unavailable",
    "try again later", "rate limit", "retryerror", "systemexit",
)

NON_RETRIABLE_KEYWORDS = (
    "invalid symbol", "khong ton tai", "does not exist", "not found",
    "no data", "valueerror", "kh√¥ng t√¨m th·∫•y d·ªØ li·ªáu", "khong tim thay du lieu",
)


def load_symbols() -> List[str]:
    """Load and deduplicate symbols from CSV file."""
    df = pd.read_csv(SYMBOLS_CSV)
    symbols = df["symbol"].dropna().str.strip().tolist()
    return list(dict.fromkeys(symbols))  # deduplicate while preserving order


def flush_batch(frames: List[pd.DataFrame], output_path: Path) -> int:
    """Persist batch of dataframes to CSV."""
    if not frames:
        return 0
    batch_df = pd.concat(frames, ignore_index=True)
    header = not output_path.exists()
    batch_df.to_csv(output_path, mode="a", index=False, header=header)
    frames.clear()
    print(f"‚úì Persisted {len(batch_df)} rows to {output_path.name}")
    return len(batch_df)


def enrich_error_message(err: Exception) -> str:
    """Extract detailed error message including nested exceptions."""
    parts = [str(err)]
    last_attempt = getattr(err, "last_attempt", None)
    if last_attempt:
        try:
            last_exc = last_attempt.exception()
            if last_exc:
                parts.append(f"last_attempt: {last_exc}")
        except Exception:
            pass
    return " | ".join(parts)


def classify_error(error_msg: str) -> Tuple[bool, bool]:
    """Classify error as retriable or non-retriable."""
    error_lower = error_msg.lower()
    
    is_non_retriable = any(kw in error_lower for kw in NON_RETRIABLE_KEYWORDS)
    is_retriable = (
        not is_non_retriable and 
        (any(kw in error_lower for kw in RETRIABLE_KEYWORDS) or
         "http" in error_lower or "connection" in error_lower)
    )
    
    return is_retriable, is_non_retriable


def fetch_stock_history(symbol: str, start_date: str, end_date: str, max_retries: int) -> Tuple[pd.DataFrame, str]:
    """Fetch stock price history with retry logic."""
    for attempt in range(1, max_retries + 1):
        try:
            quote = Quote(symbol=symbol, source="VCI")
            df = quote.history(start=start_date, end=end_date)
            return df, None
        except KeyboardInterrupt:
            raise
        except SystemExit as sys_exc:
            error_msg = f"SystemExit: {sys_exc}"
        except Exception as exc:
            error_msg = enrich_error_message(exc)
        
        is_retriable, is_non_retriable = classify_error(error_msg)
        
        if is_non_retriable:
            return None, error_msg
        
        if is_retriable and attempt < max_retries:
            wait_seconds = min(60 * attempt, 300)
            print(f"  ‚ö† Retry {attempt}/{max_retries} after {wait_seconds}s cooldown: {error_msg[:100]}")
            time.sleep(wait_seconds)
            continue
        
        return None, error_msg
    
    return None, "Max retries exceeded"


def process_symbols():
    """Main processing loop for fetching stock price history."""
    # Initialize
    OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
    for path in [OUTPUT_PATH, FAILURES_PATH]:
        if path.exists():
            path.unlink()
    
    # Load symbols
    unique_symbols = load_symbols()
    print(f"Loaded {len(unique_symbols)} unique symbols from {SYMBOLS_CSV}")
    
    # Initialize state
    symbol_queue = deque(unique_symbols)
    processed_symbols = set()
    price_history_batch = []
    failed_symbols = []
    requeue_counts = {}
    
    successful_symbols = 0
    processed_count = 0
    total_rows_written = 0
    total_symbols = len(unique_symbols)
    
    # Process queue
    while symbol_queue:
        symbol = symbol_queue.popleft()
        
        if symbol in processed_symbols:
            continue
        
        processed_count += 1
        requeue_counts.setdefault(symbol, 0)
        
        # Fetch data
        df_history, error_msg = fetch_stock_history(symbol, START_DATE, END_DATE, MAX_RETRIES)
        
        # Handle errors
        if df_history is None:
            is_retriable, is_non_retriable = classify_error(error_msg)
            
            if is_non_retriable:
                print(f"[{processed_count}/{total_symbols}] {symbol}: ‚úó Skipped (non-retriable: {error_msg[:80]})")
                failed_symbols.append({"symbol": symbol, "error": error_msg})
                time.sleep(0.5)
                continue
            
            if is_retriable and requeue_counts[symbol] < MAX_REQUEUES:
                requeue_counts[symbol] += 1
                wait_seconds = 300
                print(f"[{processed_count}/{total_symbols}] {symbol}: ‚Üª Re-queuing after {wait_seconds}s cooldown")
                time.sleep(wait_seconds)
                symbol_queue.append(symbol)
                continue
            
            print(f"[{processed_count}/{total_symbols}] {symbol}: ‚úó Failed after all retries")
            failed_symbols.append({"symbol": symbol, "error": error_msg})
            time.sleep(0.5)
            continue
        
        # Handle empty data
        if df_history.empty:
            print(f"[{processed_count}/{total_symbols}] {symbol}: ‚ö† No data returned")
            time.sleep(0.3)
            continue
        
        # Process successful fetch
        df_history = df_history.assign(symbol=symbol)
        price_history_batch.append(df_history)
        processed_symbols.add(symbol)
        successful_symbols += 1
        print(f"[{processed_count}/{total_symbols}] {symbol}: ‚úì Fetched {len(df_history)} rows")
        
        # Flush batch if needed
        if len(price_history_batch) >= BATCH_SIZE:
            total_rows_written += flush_batch(price_history_batch, OUTPUT_PATH)
        
        time.sleep(0.3)
    
    # Final flush
    total_rows_written += flush_batch(price_history_batch, OUTPUT_PATH)
    
    # Save failures
    if failed_symbols:
        df_failures = pd.DataFrame(failed_symbols)
        df_failures.to_csv(FAILURES_PATH, index=False)
        print(f"\n‚ö† Logged {len(df_failures)} failed symbols to {FAILURES_PATH.name}")
    else:
        print("\n‚úì No failures logged")
    
    # Summary
    print(f"\n{'='*60}")
    print(f"Finished processing {len(unique_symbols)} symbols")
    print(f"  Successful: {successful_symbols}")
    print(f"  Failed: {len(failed_symbols)}")
    print(f"  Total rows written: {total_rows_written}")
    print(f"  Output: {OUTPUT_PATH}")
    print(f"{'='*60}")


# Run the scraping process
process_symbols()

Loaded 1603 unique symbols from ..\..\data\raw\vietnam_stock_symbols.csv
[1/1603] A32: ‚úì Fetched 786 rows
[1/1603] A32: ‚úì Fetched 786 rows
[2/1603] AAA: ‚úì Fetched 786 rows
[2/1603] AAA: ‚úì Fetched 786 rows
[3/1603] AAH: ‚úì Fetched 449 rows
[3/1603] AAH: ‚úì Fetched 449 rows
[4/1603] AAM: ‚úì Fetched 786 rows
[4/1603] AAM: ‚úì Fetched 786 rows
[5/1603] AAS: ‚úì Fetched 786 rows
[5/1603] AAS: ‚úì Fetched 786 rows
[6/1603] AAT: ‚úì Fetched 786 rows
[6/1603] AAT: ‚úì Fetched 786 rows
[7/1603] AAV: ‚úì Fetched 786 rows
[7/1603] AAV: ‚úì Fetched 786 rows
[8/1603] ABB: ‚úì Fetched 786 rows
[8/1603] ABB: ‚úì Fetched 786 rows
[9/1603] ABC: ‚úì Fetched 786 rows
[9/1603] ABC: ‚úì Fetched 786 rows
[10/1603] ABI: ‚úì Fetched 786 rows
[10/1603] ABI: ‚úì Fetched 786 rows
[11/1603] ABR: ‚úì Fetched 786 rows
[11/1603] ABR: ‚úì Fetched 786 rows
[12/1603] ABS: ‚úì Fetched 786 rows
[12/1603] ABS: ‚úì Fetched 786 rows
[13/1603] ABT: ‚úì Fetched 786 rows
[13/1603] ABT: ‚úì Fetched 786 rows
[14/1603]