In [1]:
##############################
# Configuration & base setup #
##############################

import os
from pathlib import Path
from config.loader import load_config

project_root = Path(os.getcwd()).parent.parent
CONFIG = load_config(project_root / "config.toml")
print(CONFIG)

ValidationError: 2 validation errors for Config
blizzard.api_client_id
  Value error, API client ID must be provided [type=value_error, input_value='', input_type=str]
    For further information visit https://errors.pydantic.dev/2.12/v/value_error
blizzard.api_client_secret
  Value error, API client secret must be provided [type=value_error, input_value='', input_type=str]
    For further information visit https://errors.pydantic.dev/2.12/v/value_error

In [None]:
import time
import asyncio
import httpx
from authlib.integrations.httpx_client import AsyncOAuth2Client

class BlizzardAuctionHouseClient:
    """Client for fetching World of Warcraft Auction House data from Battle.net API"""

    def __init__(self, client_id, client_secret, region="us", locale="en_US"):
        """
        Initialize the client with OAuth credentials
        
        Args:
            client_id: Your Battle.net API client ID
            client_secret: Your Battle.net API client secret
            region: API region ('us', 'eu', 'kr', 'tw', 'cn')
        """
        self.client_id = client_id
        self.client_secret = client_secret
        self.region = region.lower()
        
        # Set region-specific endpoints
        self.token_url = f'https://{self.region}.battle.net/oauth/token'
        self.api_base = f'https://{self.region}.api.blizzard.com'

        # Namespaces
        self.namespace_static = f"static-{self.region}"
        self.namespace_dynamic = f"dynamic-{self.region}"

        # Locale
        self.locale = locale

        # HTTP default params
        self.default_params = {'namespace': self.namespace_dynamic, 'locale': self.locale}

    async def __aenter__(self):
        """Async context manager entry"""
        await self._authenticate()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit"""
        if self.client:
            await self.client.aclose()
    
    async def _authenticate(self):
        """Authenticate and get access token using client credentials flow"""
        try:
            self.client = AsyncOAuth2Client(
                client_id=self.client_id,
                client_secret=self.client_secret,
                token_endpoint=self.token_url
            )
            
            self.token = await self.client.fetch_token(
                self.token_url,
                grant_type='client_credentials'
            )
            print(f"Successfully authenticated. Token expires in {self.token.get('expires_in')} seconds")
        except Exception as e:
            print(f"Authentication failed: {e}")
            raise

    async def _ensure_valid_token(self):
        """Check if token is valid and refresh if necessary"""
        if not self.token or self.token.get('expires_at', 0) <= time.time():
            print("Token expired or missing, re-authenticating...")
            self._authenticate()

    async def get_connected_realms_index(self):
        """
        Get the connected realms index
        
        Returns:
            List of realm IDs
        """
        await self._ensure_valid_token()
        
        url = f"{self.api_base}/data/wow/connected-realm/index"
        
        response = await self.client.get(url, params=self.default_params)
        response.raise_for_status()

        realms_id = []
        for realm in response.json().get('connected_realms', []):
            href = realm.get('href', '')
            realm_id = href.split('/')[-1].split('?')[0] if href else None
            if realm_id and realm_id.isdigit():
                realms_id.append(int(realm_id))

        return realms_id
    
    async def get_connected_realm_details(self, connected_realm_id):
        """Get details for a specific connected realm"""
        await self._ensure_valid_token()
        
        url = f"{self.api_base}/data/wow/connected-realm/{connected_realm_id}"
        
        response = await self.client.get(url, params=self.default_params)
        response.raise_for_status()
        return response.json()

    async def get_auctions(self, connected_realm_id):
        """
        Get auction house data for a specific connected realm
        
        Args:
            connected_realm_id: The connected realm ID
        
        Returns:
            Dictionary containing auction house data
        """
        await self._ensure_valid_token()
        
        url = f"{self.api_base}/data/wow/connected-realm/{connected_realm_id}/auctions"
        
        response = await self.client.get(url, params=self.default_params)
        response.raise_for_status()
        return response.json()
    
    async def get_commodity_auctions(self):
        """
        Get commodity auction house data (items that are region-wide)
        
        Returns:
            Dictionary containing commodity auction data
        """
        await self._ensure_valid_token()
        
        url = f"{self.api_base}/data/wow/auctions/commodities"
        
        response = await self.client.get(url, params=self.default_params)
        response.raise_for_status()
        return response.json()
    
    async def get_multiple_realm_auctions(self, connected_realm_ids, max_concurrent=5):
        """
        Fetch auctions for multiple realms concurrently
        
        Args:
            connected_realm_ids: List of connected realm IDs
            max_concurrent: Maximum number of concurrent requests
        
        Returns:
            Dictionary mapping realm_id to auction data
        """
        semaphore = asyncio.Semaphore(max_concurrent)
        
        async def fetch_with_semaphore(realm_id):
            async with semaphore:
                try:
                    print(f"Fetching auctions for realm {realm_id}...")
                    data = await self.get_auctions(realm_id)
                    print(f"Completed realm {realm_id}: {len(data.get('auctions', []))} auctions")
                    return realm_id, data
                except Exception as e:
                    print(f"Error fetching realm {realm_id}: {e}")
                    return realm_id, None
        
        tasks = [fetch_with_semaphore(realm_id) for realm_id in connected_realm_ids]
        results = await asyncio.gather(*tasks)
        
        return {realm_id: data for realm_id, data in results if data is not None}
    
    async def get_all_realm_details(self, realm_ids, max_concurrent=10):
        """
        Fetch detailed information for multiple realms concurrently
        
        Args:
            realm_ids: List of connected realm IDs
            max_concurrent: Maximum number of concurrent requests
        
        Returns:
            Dictionary mapping realm_id to realm details
        """
        semaphore = asyncio.Semaphore(max_concurrent)
        
        async def fetch_with_semaphore(realm_id):
            async with semaphore:
                try:
                    return realm_id, await self.get_connected_realm_details(realm_id)
                except Exception as e:
                    print(f"Error fetching realm {realm_id} details: {e}")
                    return realm_id, None
        
        tasks = [fetch_with_semaphore(realm_id) for realm_id in realm_ids]
        results = await asyncio.gather(*tasks)
        
        return {realm_id: data for realm_id, data in results if data is not None}

In [None]:
from pathlib import Path
import datetime
import pandas as pd
import json
import os

class ParquetWriter:
    """Handles writing Battle.net API data to Parquet format"""
    
    def __init__(self, root_dir, region, compression='snappy'):
        """
        Initialize the Parquet writer
        
        Args:
            root_dir: Directory to save Parquet files
            region: Name of the Blizzard region
            compression: Compression algorithm ('snappy', 'gzip', 'brotli', 'zstd')
        
        Raises:
            FileNotFoundError: If the root_dir does not exist
            NotADirectoryError: If the root_dir is not a directory
            PermissionError: If the root_dir is not writable
        """
        self.region = region
        self.root_dir = Path(root_dir)
        
        # Validate root_dir exists
        if not self.root_dir.exists():
            raise FileNotFoundError(f"Root data directory does not exist: {self.root_dir}")
        
        # Validate root_dir is a directory
        if not self.root_dir.is_dir():
            raise NotADirectoryError(f"Root data path is not a directory: {self.root_dir}")
        
        # Validate root_dir is writable
        if not os.access(self.root_dir, os.W_OK):
            raise PermissionError(f"Root data directory is not writable: {self.root_dir}")

        self.data_dir = self.root_dir / self.region
        self.data_dir.mkdir(mode=0o774, exist_ok=True)
        
        self.compression = compression
    
    def save_auctions_to_parquet(self, auction_data, filename, connected_realm_id=None):
        """
        Save auction data to Parquet format with optimized schema
        
        Args:
            auction_data: Raw auction data from API
            filename: Output filename (can be Path or string)
            connected_realm_id: Optional realm ID to include in data
        
        Returns:
            DataFrame of the saved data or None if no auctions
        """
        auctions = auction_data.get('auctions', [])
        
        if not auctions:
            print("No auctions to save")
            return None
        
        # Flatten the nested structure
        flattened_auctions = []
        timestamp = datetime.utcnow()
        
        for auction in auctions:
            flat_auction = {
                'fetch_timestamp': timestamp,
                'auction_id': auction.get('id'),
                'item_id': auction.get('item', {}).get('id'),
                'quantity': auction.get('quantity', 1),
                'unit_price': auction.get('unit_price'),
                'buyout': auction.get('buyout'),
                'time_left': auction.get('time_left'),
            }
            
            # Add optional fields
            if connected_realm_id:
                flat_auction['connected_realm_id'] = connected_realm_id
            
            # Handle item bonuses
            item = auction.get('item', {})
            if 'bonus_lists' in item:
                flat_auction['bonus_lists'] = ','.join(map(str, item['bonus_lists']))
            
            if 'modifiers' in item:
                for mod in item['modifiers']:
                    mod_type = mod.get('type')
                    mod_value = mod.get('value')
                    flat_auction[f'modifier_{mod_type}'] = mod_value
            
            # Handle bid (may not be present for all auctions)
            if 'bid' in auction:
                flat_auction['bid'] = auction['bid']
            
            flattened_auctions.append(flat_auction)
        
        # Convert to DataFrame
        df = pd.DataFrame(flattened_auctions)
        
        # Optimize data types for better compression
        df = self._optimize_auction_dtypes(df)
        
        # Ensure filename is a Path and save
        filepath = self.data_dir / filename if not Path(filename).is_absolute() else Path(filename)
        df.to_parquet(
            filepath,
            engine='pyarrow',
            compression=self.compression,
            index=False
        )
        
        file_size = filepath.stat().st_size / (1024 * 1024)  # Size in MB
        print(f"Saved {len(df)} auctions to {filepath} ({file_size:.2f} MB)")
        
        return df
    
    def save_connected_realms_to_parquet(self, realms_id, filename, realm_details=None):
        """
        Save connected realms data to Parquet format
        
        Args:
            realms_id: List of connected realm IDs
            filename: Output filename (can be Path or string)
            realm_details: Optional dictionary of realm_id -> detailed realm info
        
        Returns:
            DataFrame of the saved data or None if no realms
        """
        
        if not realms_id:
            print("No connected realms to save")
            return None

        # Parse detailed information
        realm_list = []
        for realm_id in realms_id:
            detail = realm_details.get(realm_id)
            
            if detail:
                realm_info = {
                    'connected_realm_id': realm_id,
                    'has_queue': detail.get('has_queue', False),
                    'status_type': detail.get('status', {}).get('type'),
                    'population_type': detail.get('population', {}).get('type'),
                    'num_realms': len(detail.get('realms', []))
                }
                
                # Get realm names (comma-separated if multiple)
                realm_names = [r.get('name', '') for r in detail.get('realms', [])]
                realm_info['realm_names'] = ', '.join(realm_names)
                
                # Get realm slugs
                realm_slugs = [r.get('slug', '') for r in detail.get('realms', [])]
                realm_info['realm_slugs'] = ', '.join(realm_slugs)
                
                realm_list.append(realm_info)
            else:
                realm_list.append({'connected_realm_id': realm_id})
            
        df = pd.DataFrame(realm_list)
        
        # Optimize data types
        df = self._optimize_realm_dtypes(df)
        
        # Ensure filename is a Path and save
        filepath = self.data_dir / filename if not Path(filename).is_absolute() else Path(filename)
        df.to_parquet(filepath, engine='pyarrow', compression=self.compression, index=False)
        print(f"Saved {len(df)} connected realms to {filepath}")
        
        return df
    
    def save_multiple_realm_auctions_to_parquet(self, auctions_dict, timestamp_str=None):
        """
        Save auction data for multiple realms to separate Parquet files
        
        Args:
            auctions_dict: Dictionary mapping realm_id to auction data
            timestamp_str: Optional timestamp string for filenames
        
        Returns:
            Dictionary mapping realm_id to DataFrame
        """
        if timestamp_str is None:
            timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
        
        dataframes = {}
        for realm_id, auction_data in auctions_dict.items():
            filename = f'auctions_realm_{realm_id}_{timestamp_str}.parquet'
            df = self.save_auctions_to_parquet(auction_data, filename, connected_realm_id=realm_id)
            if df is not None:
                dataframes[realm_id] = df
        
        return dataframes
    
    def save_combined_realm_auctions_to_parquet(self, auctions_dict, filename, timestamp_str=None):
        """
        Save auction data for multiple realms to a single Parquet file
        
        Args:
            auctions_dict: Dictionary mapping realm_id to auction data
            filename: Output filename
            timestamp_str: Optional timestamp string for fetch_timestamp
        
        Returns:
            Combined DataFrame
        """
        all_dfs = []
        
        for realm_id, auction_data in auctions_dict.items():
            auctions = auction_data.get('auctions', [])
            if not auctions:
                continue
            
            # Flatten auctions for this realm
            flattened = []
            timestamp = datetime.utcnow()
            
            for auction in auctions:
                flat_auction = {
                    'fetch_timestamp': timestamp,
                    'connected_realm_id': realm_id,
                    'auction_id': auction.get('id'),
                    'item_id': auction.get('item', {}).get('id'),
                    'quantity': auction.get('quantity', 1),
                    'unit_price': auction.get('unit_price'),
                    'buyout': auction.get('buyout'),
                    'time_left': auction.get('time_left'),
                }
                
                # Handle item bonuses
                item = auction.get('item', {})
                if 'bonus_lists' in item:
                    flat_auction['bonus_lists'] = ','.join(map(str, item['bonus_lists']))
                
                if 'bid' in auction:
                    flat_auction['bid'] = auction['bid']
                
                flattened.append(flat_auction)
            
            df = pd.DataFrame(flattened)
            all_dfs.append(df)
        
        if not all_dfs:
            print("No auctions to save")
            return None
        
        # Combine all dataframes
        combined_df = pd.concat(all_dfs, ignore_index=True)
        combined_df = self._optimize_auction_dtypes(combined_df)
        
        # Save
        filepath = self.data_dir / filename if not Path(filename).is_absolute() else Path(filename)
        combined_df.to_parquet(filepath, engine='pyarrow', compression=self.compression, index=False)
        
        file_size = filepath.stat().st_size / (1024 * 1024)
        print(f"Saved {len(combined_df)} total auctions from {len(auctions_dict)} realms to {filepath} ({file_size:.2f} MB)")
        
        return combined_df
    
    def save_to_json(self, data, filename):
        """Save data to JSON file (for reference/debugging)"""
        filepath = self.data_dir / filename if not Path(filename).is_absolute() else Path(filename)
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
        print(f"Data saved to {filepath}")
    
    def _optimize_auction_dtypes(self, df):
        """Optimize data types for auction DataFrames"""
        if 'auction_id' in df.columns:
            df['auction_id'] = df['auction_id'].astype('int64')
        if 'item_id' in df.columns:
            df['item_id'] = df['item_id'].astype('int32')
        if 'quantity' in df.columns:
            df['quantity'] = df['quantity'].astype('int16')
        if 'unit_price' in df.columns:
            df['unit_price'] = df['unit_price'].astype('int64')
        if 'buyout' in df.columns:
            df['buyout'] = df['buyout'].astype('int64')
        if 'bid' in df.columns:
            df['bid'] = df['bid'].astype('int64')
        if 'time_left' in df.columns:
            df['time_left'] = df['time_left'].astype('category')
        if 'connected_realm_id' in df.columns:
            df['connected_realm_id'] = df['connected_realm_id'].astype('int16')
        return df
    
    def _optimize_realm_dtypes(self, df):
        """Optimize data types for realm DataFrames"""
        if 'connected_realm_id' in df.columns:
            df['connected_realm_id'] = df['connected_realm_id'].astype('int16')
        if 'has_queue' in df.columns:
            df['has_queue'] = df['has_queue'].astype('bool')
        if 'status_type' in df.columns:
            df['status_type'] = df['status_type'].astype('category')
        if 'population_type' in df.columns:
            df['population_type'] = df['population_type'].astype('category')
        if 'num_realms' in df.columns:
            df['num_realms'] = df['num_realms'].astype('int8')
        return df

In [None]:
# Main code

writer = ParquetWriter(root_dir=WA_STORAGE_LOCAL_DIR, region=BLIZZARD_API_REGION, compression='snappy')

async def main():
    
    # Use async context manager
    async with BlizzardAuctionHouseClient(BLIZZARD_API_CLIENT_ID, BLIZZARD_API_CLIENT_SECRET, BLIZZARD_API_REGION) as client:
        
        print("\nFetching connected realms...")
        realm_ids = await client.get_connected_realms_index()
        print(f"Found {len(realm_ids)} connected realms")
        realm_details = await client.get_all_realm_details(realm_ids)

        # Store connected realms
        df_realms = writer.save_connected_realms_to_parquet(
            realm_ids, 
            'connected_realms.parquet',
            realm_details=realm_details
        )
        
# Run
await main()