## Installation and Setup

In [None]:
"""
## Environment Setup
Install all required libraries for the data collection pipeline.
This cell only needs to be run once per environment.
"""
#!pip install -q aiohttp aiofiles pandas tqdm psutil requests beautifulsoup4 scrapy spacy nltk transformers datasets

print(" Ready to import libraries")


## Import Libraries

In [None]:
"""
## Import Required Libraries
Import all necessary libraries for asynchronous API calls, data processing,
and checkpoint management.
"""

import asyncio
import aiohttp
import aiofiles
import pandas as pd
import json
import time
import logging
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from tqdm.asyncio import tqdm
import sys
import psutil
import os

print(" All libraries imported successfully")

 All libraries imported successfully


## Configuration and Logging Setup

In [None]:
"""
## Configuration Settings
Configure all pipeline parameters including rate limits, retry settings,
and file paths. Adjust these based on your needs and API limits.
"""

# ============= CONFIGURATION =============
CONFIG = {
    # Rate limiting (requests per second)
    'MAX_CONCURRENT_REQUESTS': 3,  # Reduced for 8GB RAM
    'REQUEST_DELAY': 0.5,  # Delay between requests in seconds
    
    # Retry settings
    'MAX_RETRIES': 5,
    'INITIAL_BACKOFF': 2,  # Initial backoff in seconds
    'MAX_BACKOFF': 60,  # Maximum backoff in seconds
    
    # Memory management
    'CHUNK_SIZE': 100,  # Save to disk every N records
    'BATCH_SIZE': 50,  # Process N pages before clearing memory
    
    # File paths
    'CHECKPOINT_FILE': 'pipeline_checkpoint.json',
    'LOG_FILE': 'pipeline_execution.log',
    'ERROR_LOG': 'pipeline_errors.log',
    'TEMP_DIR': 'temp_data',
    'FINAL_OUTPUT': 'customer_insurance_reviews_final.csv',
}

# Create necessary directories
Path(CONFIG['TEMP_DIR']).mkdir(exist_ok=True)

# ============= LOGGING SETUP =============
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(CONFIG['LOG_FILE']),
        logging.StreamHandler(sys.stdout)
    ]
)

logger = logging.getLogger(__name__)

# Separate error logger
error_logger = logging.getLogger('errors')
error_handler = logging.FileHandler(CONFIG['ERROR_LOG'])
error_handler.setLevel(logging.ERROR)
error_logger.addHandler(error_handler)

logger.info(" Configuration and logging initialized")
print(f" Configuration loaded - Max concurrent requests: {CONFIG['MAX_CONCURRENT_REQUESTS']}")
print(f" Checkpoint file: {CONFIG['CHECKPOINT_FILE']}")
print(f" Temporary data directory: {CONFIG['TEMP_DIR']}")

2025-10-26 15:35:49,196 - INFO -  Configuration and logging initialized
 Configuration loaded - Max concurrent requests: 3
 Checkpoint file: pipeline_checkpoint.json
 Temporary data directory: temp_data


## API Endpoints Configuration

In [None]:
"""
## API Endpoints Configuration
Define all insurance company APIs to be scraped. Each entry contains
the API URL and a business identifier.
"""

INSURANCE_APIS = [
    ('https://api.hellopeter.com/consumer/business/king-price-insurance/reviews', 'king_price_insurance'),
    ('https://api.hellopeter.com/consumer/business/momentum-insure/reviews', 'momentum'),
    ('https://api.hellopeter.com/consumer/business/miway/reviews', 'miway'),
    ('https://api.hellopeter.com/consumer/business/miwaylife/reviews', 'miway'),
    ('https://api.hellopeter.com/consumer/business/blink/reviews', 'miway'),
    ('https://api.hellopeter.com/consumer/business/auto-and-general/reviews', 'auto_and_general'),
    ('https://api.hellopeter.com/consumer/business/bidvest-insurance/reviews', 'bidvest_insurance'),
    ('https://api.hellopeter.com/consumer/business/budget-insurance/reviews', 'budget_insurance'),
    ('https://api.hellopeter.com/consumer/business/dotsurecoza/reviews', 'dotsure'),
    ('https://api.hellopeter.com/consumer/business/naked-insurance/reviews', 'naked_insurance'),
    ('https://api.hellopeter.com/consumer/business/first-for-women-insurance/reviews', 'first_for_women_insurance'),
    ('https://api.hellopeter.com/consumer/business/santam/reviews', 'santam'),
    ('https://api.hellopeter.com/consumer/business/discovery-insure/reviews', 'discovery'),
    ('https://api.hellopeter.com/consumer/business/iwyze/reviews', 'old_mutual'),
    ('https://api.hellopeter.com/consumer/business/idirect/reviews', 'absa_insurance'),
    ('https://api.hellopeter.com/consumer/business/mutual-federal/reviews', 'old_mutual'),
    ('https://api.hellopeter.com/consumer/business/1life/reviews', '1life'),
    ('https://api.hellopeter.com/consumer/business/indiefin/reviews', 'sanlam'),
    ('https://api.hellopeter.com/consumer/business/outsurance/reviews', 'outsurance'),
    ('https://api.hellopeter.com/consumer/business/pps/reviews', 'pps'),
    ('https://api.hellopeter.com/consumer/business/sa-underwriting-agencies-pty-ltd/reviews', 'sau'),
    ('https://api.hellopeter.com/consumer/business/hollard-insurance/reviews', 'hollard_insurance'),
    ('https://api.hellopeter.com/consumer/business/psg-insure/reviews', 'psg_insure'),
    ('https://api.hellopeter.com/consumer/business/assupol-life/reviews', 'assupol'),
    ('https://api.hellopeter.com/consumer/business/metropolitan-life/reviews', 'metropolitan'),
    ('https://api.hellopeter.com/consumer/business/platinum-life/reviews', 'platinum_life'),
    ('https://api.hellopeter.com/consumer/business/liberty-life/reviews', 'liberty'),
    ('https://api.hellopeter.com/consumer/business/clientele-life/reviews', 'clientele'),
    ('https://api.hellopeter.com/consumer/business/alllife/reviews', 'alllife'),
    ('https://api.hellopeter.com/consumer/business/better-sure-financial-consultants/reviews', 'better_sure_financial_consultants'),
    ('https://api.hellopeter.com/consumer/business/safrican-insurance-company-1/reviews', 'safrican_insurance'),
    ('https://api.hellopeter.com/consumer/business/brightrock/reviews', 'brightrock'),
    ('https://api.hellopeter.com/consumer/business/icebolethu-funerals/reviews', 'icebolethu_funerals'),
    ('https://api.hellopeter.com/consumer/business/tendahealth/reviews', 'tendahealth'),
    ('https://api.hellopeter.com/consumer/business/simply-financial-services/reviews', 'simply_financial_services'),
    ('https://api.hellopeter.com/consumer/business/different-life/reviews', 'different_life'),
    ('https://api.hellopeter.com/consumer/business/prime-meridian-direct/reviews', 'prime_meridian_direct'),
    ('https://api.hellopeter.com/consumer/business/oneplan-car-and-household/reviews', 'oneplan_car_and_household'),
    ('https://api.hellopeter.com/consumer/business/pineapple-insurance/reviews', 'pineapple_insurance'),
    ('https://api.hellopeter.com/consumer/business/m-sure-financial-services/reviews', 'm_sure_financial_services'),
    ('https://api.hellopeter.com/consumer/business/bsure-insurance-advisors/reviews', 'bsure_insurance_advisors'),
    ('https://api.hellopeter.com/consumer/business/integrisure/reviews', 'integrisure'),
    ('https://api.hellopeter.com/consumer/business/aa-warranties/reviews', 'aa_warranties'),
    ('https://api.hellopeter.com/consumer/business/hippocoza/reviews', 'hippocoza'),
    ('https://api.hellopeter.com/consumer/business/cib-insurance/reviews', 'cib_insurance'),
    ('https://api.hellopeter.com/consumer/business/cubix-solutions/reviews', 'cubix_solutions'),
    ('https://api.hellopeter.com/consumer/business/sohva/reviews', 'sohva'),
    ('https://api.hellopeter.com/consumer/business/elixihealth/reviews', 'elixihealth'),
    ('https://api.hellopeter.com/consumer/business/total-risk-administrators/reviews', 'total_risk_administrators'),
    ('https://api.hellopeter.com/consumer/business/zestlife/reviews', 'zestlife'),
    ('https://api.hellopeter.com/consumer/business/medipet-animal-health-insurance-brokers-pty-ltd/reviews', 'medipet_animal_health'),
    ('https://api.hellopeter.com/consumer/business/oobainsure/reviews', 'oobainsure')
]

logger.info(f" Configured {len(INSURANCE_APIS)} insurance APIs")
print(f" Total APIs to process: {len(INSURANCE_APIS)}")

2025-10-26 15:35:56,006 - INFO -  Configured 52 insurance APIs
 Total APIs to process: 52


## Checkpoint Management Functions

In [None]:
"""
## Checkpoint Management
Functions to save and load pipeline progress. This allows the pipeline
to resume from where it left off if interrupted.
"""

class CheckpointManager:
    """Manages pipeline checkpoint state for resume capability"""
    
    def __init__(self, checkpoint_file: str):
        self.checkpoint_file = checkpoint_file
        self.state = self._load_checkpoint()
    
    def _load_checkpoint(self) -> Dict:
        """Load checkpoint from file or create new"""
        if Path(self.checkpoint_file).exists():
            try:
                with open(self.checkpoint_file, 'r') as f:
                    state = json.load(f)
                logger.info(f" Loaded checkpoint from {self.checkpoint_file}")
                return state
            except Exception as e:
                logger.warning(f"Failed to load checkpoint: {e}. Starting fresh.")
                return self._create_empty_checkpoint()
        return self._create_empty_checkpoint()
    
    def _create_empty_checkpoint(self) -> Dict:
        """Create empty checkpoint structure"""
        return {
            'current_api_index': 0,
            'completed_apis': [],
            'failed_apis': [],
            'last_update': None,
            'total_records_processed': 0,
            'start_time': datetime.now().isoformat()
        }
    
    def save_checkpoint(self):
        """Save current state to file"""
        self.state['last_update'] = datetime.now().isoformat()
        try:
            with open(self.checkpoint_file, 'w') as f:
                json.dump(self.state, f, indent=2)
            logger.debug(f"Checkpoint saved: API {self.state['current_api_index']}")
        except Exception as e:
            logger.error(f"Failed to save checkpoint: {e}")
    
    def mark_api_complete(self, api_name: str, records_count: int):
        """Mark an API as completed"""
        self.state['completed_apis'].append({
            'name': api_name,
            'records': records_count,
            'timestamp': datetime.now().isoformat()
        })
        self.state['current_api_index'] += 1
        self.state['total_records_processed'] += records_count
        self.save_checkpoint()
    
    def mark_api_failed(self, api_name: str, error: str):
        """Mark an API as failed"""
        self.state['failed_apis'].append({
            'name': api_name,
            'error': str(error),
            'timestamp': datetime.now().isoformat()
        })
        self.state['current_api_index'] += 1
        self.save_checkpoint()
    
    def get_resume_index(self) -> int:
        """Get the index to resume from"""
        return self.state['current_api_index']
    
    def reset(self):
        """Reset checkpoint to start fresh"""
        self.state = self._create_empty_checkpoint()
        self.save_checkpoint()
        logger.info("Checkpoint reset")

# Initialize checkpoint manager
checkpoint = CheckpointManager(CONFIG['CHECKPOINT_FILE'])
logger.info(" Checkpoint manager initialized")
print(f" Will resume from API index: {checkpoint.get_resume_index()}")

## Rate Limiter Class

In [None]:
"""
## Rate Limiter
Implements token bucket algorithm for rate limiting API requests.
Prevents overwhelming the API and ensures compliance with rate limits.
"""

class RateLimiter:
    """Token bucket rate limiter for API requests"""
    
    def __init__(self, max_requests: int, delay: float):
        self.max_requests = max_requests
        self.delay = delay
        self.semaphore = asyncio.Semaphore(max_requests)
        self.last_request_time = 0
    
    async def acquire(self):
        """Acquire permission to make a request"""
        async with self.semaphore:
            # Ensure minimum delay between requests
            current_time = time.time()
            time_since_last = current_time - self.last_request_time
            if time_since_last < self.delay:
                await asyncio.sleep(self.delay - time_since_last)
            self.last_request_time = time.time()

rate_limiter = RateLimiter(
    CONFIG['MAX_CONCURRENT_REQUESTS'],
    CONFIG['REQUEST_DELAY']
)
logger.info(" Rate limiter initialized")


## Async HTTP Fetcher with Retry Logic

In [None]:
"""
## Async HTTP Fetcher
Asynchronous HTTP request handler with exponential backoff retry logic.
Handles network errors, timeouts, and rate limiting gracefully.
"""

async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    business_name: str,
    attempt: int = 0
) -> Optional[Dict]:
    """
    Fetch data from API with exponential backoff retry logic
    
    Args:
        session: aiohttp client session
        url: API endpoint URL
        business_name: Business identifier for logging
        attempt: Current retry attempt number
    
    Returns:
        JSON response as dictionary or None if failed
    """
    if attempt >= CONFIG['MAX_RETRIES']:
        error_msg = f"Max retries ({CONFIG['MAX_RETRIES']}) reached for {business_name}"
        logger.error(error_msg)
        error_logger.error(f"{business_name} | {url} | {error_msg}")
        return None
    
    try:
        # Apply rate limiting
        await rate_limiter.acquire()
        
        # Make the request with timeout
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
            if response.status == 200:
                return await response.json()
            elif response.status == 429:  # Too many requests
                backoff = min(CONFIG['MAX_BACKOFF'], CONFIG['INITIAL_BACKOFF'] * (2 ** attempt))
                logger.warning(f"Rate limited for {business_name}. Retrying in {backoff}s (attempt {attempt + 1}/{CONFIG['MAX_RETRIES']})")
                await asyncio.sleep(backoff)
                return await fetch_with_retry(session, url, business_name, attempt + 1)
            else:
                logger.warning(f"HTTP {response.status} for {business_name}")
                return None
    
    except asyncio.TimeoutError:
        backoff = min(CONFIG['MAX_BACKOFF'], CONFIG['INITIAL_BACKOFF'] * (2 ** attempt))
        logger.warning(f"Timeout for {business_name}. Retrying in {backoff}s (attempt {attempt + 1}/{CONFIG['MAX_RETRIES']})")
        await asyncio.sleep(backoff)
        return await fetch_with_retry(session, url, business_name, attempt + 1)
    
    except Exception as e:
        backoff = min(CONFIG['MAX_BACKOFF'], CONFIG['INITIAL_BACKOFF'] * (2 ** attempt))
        logger.warning(f"Error for {business_name}: {str(e)}. Retrying in {backoff}s (attempt {attempt + 1}/{CONFIG['MAX_RETRIES']})")
        error_logger.error(f"{business_name} | {url} | {str(e)}")
        await asyncio.sleep(backoff)
        return await fetch_with_retry(session, url, business_name, attempt + 1)

logger.info(" Async fetcher with retry logic defined")


## Data Storage Functions

In [None]:
"""
## Data Storage Functions
Functions to save data to disk in chunks to minimize memory usage.
Uses JSON format for temporary storage and CSV for final output.
"""

async def save_chunk_to_disk(data: List[Dict], business_name: str, chunk_id: int):
    """
    Save a chunk of data to temporary JSON file
    
    Args:
        data: List of review records
        business_name: Business identifier
        chunk_id: Unique chunk identifier
    """
    filename = Path(CONFIG['TEMP_DIR']) / f"{business_name}_chunk_{chunk_id}.json"
    try:
        async with aiofiles.open(filename, 'w') as f:
            await f.write(json.dumps(data))
        logger.debug(f"Saved chunk {chunk_id} for {business_name} ({len(data)} records)")
    except Exception as e:
        logger.error(f"Failed to save chunk {chunk_id} for {business_name}: {e}")
        raise

def load_temp_files_for_business(business_name: str) -> pd.DataFrame:
    """
    Load all temporary chunks for a business and return as DataFrame
    
    Args:
        business_name: Business identifier
    
    Returns:
        Combined DataFrame of all chunks
    """
    temp_dir = Path(CONFIG['TEMP_DIR'])
    pattern = f"{business_name}_chunk_*.json"
    chunk_files = sorted(temp_dir.glob(pattern))
    
    if not chunk_files:
        return pd.DataFrame()
    
    all_data = []
    for chunk_file in chunk_files:
        try:
            with open(chunk_file, 'r') as f:
                data = json.load(f)
                all_data.extend(data)
        except Exception as e:
            logger.error(f"Failed to load {chunk_file}: {e}")
    
    df = pd.DataFrame(all_data)
    df['Business_Name'] = business_name
    return df

logger.info(" Data storage functions defined")

## Single API Processing Function

In [None]:
"""
## Single API Processor
Main function to process a single insurance company API.
Handles pagination, chunking, and saves data incrementally to disk.
"""

async def process_single_api(
    session: aiohttp.ClientSession,
    url: str,
    business_name: str,
    api_index: int,
    total_apis: int
) -> Tuple[bool, int]:
    """
    Process all pages for a single API endpoint
    
    Args:
        session: aiohttp client session
        url: API endpoint URL
        business_name: Business identifier
        api_index: Current API index for progress tracking
        total_apis: Total number of APIs
    
    Returns:
        Tuple of (success: bool, records_count: int)
    """
    logger.info(f"{'='*60}")
    logger.info(f"Processing API {api_index + 1}/{total_apis}: {business_name}")
    logger.info(f"{'='*60}")
    
    try:
        # Fetch first page to get pagination info
        init_data = await fetch_with_retry(session, url, business_name)
        if not init_data:
            logger.error(f"Failed to fetch initial data for {business_name}")
            return False, 0
        
        # Get pagination information
        if 'last_page' not in init_data:
            logger.error(f"No pagination info found for {business_name}")
            return False, 0
        
        try:
            last_page = int(init_data['last_page'])
            logger.info(f"Found {last_page} pages for {business_name}")
        except (ValueError, TypeError):
            logger.error(f"Invalid last_page value for {business_name}")
            return False, 0
        
        # Process pages in batches
        all_records = []
        chunk_counter = 0
        total_records = 0
        
        # Create progress bar
        pbar = tqdm(total=last_page, desc=f"{business_name}", unit="page")
        
        for page_num in range(1, last_page + 1):
            # Construct page URL
            base_url = init_data.get('next_page_url', url)
            if '?' in base_url:
                page_url = base_url.rsplit('?', 1)[0] + f'?page={page_num}'
            else:
                page_url = f"{base_url}?page={page_num}"
            
            # Fetch page data
            page_data = await fetch_with_retry(session, page_url, business_name)
            
            if page_data and 'data' in page_data:
                records = page_data['data']
                all_records.extend(records)
                total_records += len(records)
                
                # Save chunk when threshold reached
                if len(all_records) >= CONFIG['CHUNK_SIZE']:
                    await save_chunk_to_disk(all_records, business_name, chunk_counter)
                    chunk_counter += 1
                    all_records = []  # Clear memory
            else:
                logger.warning(f"No data in page {page_num} for {business_name}")
            
            pbar.update(1)
            
            # Memory management: clear batch periodically
            if page_num % CONFIG['BATCH_SIZE'] == 0:
                logger.debug(f"Batch checkpoint at page {page_num}")
        
        pbar.close()
        
        # Save remaining records
        if all_records:
            await save_chunk_to_disk(all_records, business_name, chunk_counter)
        
        logger.info(f" Completed {business_name}: {total_records} total records")
        return True, total_records
    
    except Exception as e:
        logger.error(f"✗ Failed processing {business_name}: {str(e)}")
        error_logger.error(f"{business_name} | Fatal error | {str(e)}")
        return False, 0

logger.info(" Single API processor defined")

## Main Pipeline Orchestrator

In [None]:
"""
## Main Pipeline Orchestrator
Coordinates the sequential processing of all APIs.
Handles checkpointing, error recovery, and progress tracking.
"""

async def run_pipeline():
    """
    Main pipeline execution function
    Processes all APIs sequentially with checkpoint support
    """
    start_time = time.time()
    resume_index = checkpoint.get_resume_index()
    
    logger.info(f"{'#'*60}")
    logger.info(f"STARTING DATA COLLECTION PIPELINE")
    logger.info(f"Total APIs: {len(INSURANCE_APIS)}")
    logger.info(f"Resuming from index: {resume_index}")
    logger.info(f"{'#'*60}")
    
    # Create aiohttp session with custom connector settings
    connector = aiohttp.TCPConnector(limit=CONFIG['MAX_CONCURRENT_REQUESTS'])
    timeout = aiohttp.ClientTimeout(total=60)
    
    async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
        # Process APIs sequentially from resume point
        for idx in range(resume_index, len(INSURANCE_APIS)):
            url, business_name = INSURANCE_APIS[idx]
            
            logger.info(f"\n{'*'*60}")
            logger.info(f"API {idx + 1}/{len(INSURANCE_APIS)}: {business_name}")
            logger.info(f"{'*'*60}")
            
            success, record_count = await process_single_api(
                session,
                url,
                business_name,
                idx,
                len(INSURANCE_APIS)
            )
            
            if success:
                checkpoint.mark_api_complete(business_name, record_count)
                logger.info(f" Checkpoint saved for {business_name}")
            else:
                checkpoint.mark_api_failed(business_name, "Processing failed")
                logger.warning(f"✗ Marked {business_name} as failed, continuing...")
            
            # Brief pause between APIs to be gentle on resources
            await asyncio.sleep(2)
    
    elapsed_time = time.time() - start_time
    logger.info(f"\n{'#'*60}")
    logger.info(f"PIPELINE COMPLETED")
    logger.info(f"Total time: {elapsed_time:.2f} seconds ({elapsed_time/60:.2f} minutes)")
    logger.info(f"Total records: {checkpoint.state['total_records_processed']}")
    logger.info(f"Completed APIs: {len(checkpoint.state['completed_apis'])}")
    logger.info(f"Failed APIs: {len(checkpoint.state['failed_apis'])}")
    logger.info(f"{'#'*60}")

logger.info(" Pipeline orchestrator defined")

## Execute the Pipeline

In [None]:
"""
## Execute Pipeline
Run the main data collection pipeline.
This cell executes the asynchronous pipeline.
"""

# Run the pipeline
print("\n" + "="*60)
print("STARTING DATA COLLECTION PIPELINE")
print("="*60 + "\n")

# Execute async pipeline
await run_pipeline()

print("\n" + "="*60)
print("DATA COLLECTION COMPLETED")
print("="*60 + "\n")


## Consolidate Data from Temporary Files

In [None]:
"""
## Consolidate Data
Merge all temporary JSON chunks into a single CSV file.
This step processes data in batches to minimize memory usage.
"""

def consolidate_all_data():
    """
    Consolidate all temporary data files into final CSV
    Uses chunked processing to minimize memory usage
    """
    logger.info("Starting data consolidation...")
    print("\n" + "="*60)
    print("CONSOLIDATING DATA")
    print("="*60 + "\n")
    
    final_output = Path(CONFIG['FINAL_OUTPUT'])
    temp_dir = Path(CONFIG['TEMP_DIR'])
    
    # Get all unique business names from checkpoint
    completed_apis = [api['name'] for api in checkpoint.state['completed_apis']]
    
    if not completed_apis:
        logger.warning("No completed APIs found to consolidate")
        return
    
    # Write header
    first_df = load_temp_files_for_business(completed_apis[0])
    first_df.to_csv(final_output, index=False, mode='w')
    logger.info(f" Wrote header and data for {completed_apis[0]}")
    
    # Append remaining businesses
    for business_name in tqdm(completed_apis[1:], desc="Consolidating"):
        try:
            df = load_temp_files_for_business(business_name)
            if not df.empty:
                df.to_csv(final_output, index=False, mode='a', header=False)
                logger.info(f" Appended {len(df)} records for {business_name}")
            else:
                logger.warning(f"No data found for {business_name}")
        except Exception as e:
            logger.error(f"Failed to consolidate {business_name}: {e}")
    
    # Get final file size and record count
    final_df = pd.read_csv(final_output, nrows=5)  # Sample to verify
    file_size_mb = final_output.stat().st_size / (1024 * 1024)
    
    logger.info(f"\n{'='*60}")
    logger.info(f"CONSOLIDATION COMPLETE")
    logger.info(f"Output file: {final_output}")
    logger.info(f"File size: {file_size_mb:.2f} MB")
    logger.info(f"Total businesses: {len(completed_apis)}")
    logger.info(f"{'='*60}")
    
    print(f"\n Final dataset saved to: {CONFIG['FINAL_OUTPUT']}")
    print(f" File size: {file_size_mb:.2f} MB")
    print(f" Total businesses processed: {len(completed_apis)}")

# Run consolidation
consolidate_all_data()

## Data Verification and Summary

In [None]:
"""
## Verify Final Output
Load and verify the consolidated dataset.
Display summary statistics and sample records.
"""

print("\n" + "="*60)
print("VERIFYING FINAL OUTPUT")
print("="*60 + "\n")

try:
    # Load with chunking to avoid memory issues
    chunk_iterator = pd.read_csv(CONFIG['FINAL_OUTPUT'], chunksize=10000)
    
    # Get basic stats without loading full dataset
    total_rows = 0
    business_counts = {}
    
    for chunk in chunk_iterator:
        total_rows += len(chunk)
        for business in chunk['Business_Name'].value_counts().items():
            business_counts[business[0]] = business_counts.get(business[0], 0) + business[1]
    
    print(f" Total records: {total_rows:,}")
    print(f" Unique businesses: {len(business_counts)}")
    print(f"\nTop 10 businesses by review count:")
    print("-" * 60)
    
    sorted_businesses = sorted(business_counts.items(), key=lambda x: x[1], reverse=True)[:10]
    for business, count in sorted_businesses:
        print(f"  {business:<40} {count:>10,} reviews")
    
    # Display sample records (first 5 rows)
    print(f"\n{'='*60}")
    print("SAMPLE RECORDS (first 5 rows)")
    print("="*60 + "\n")
    sample_df = pd.read_csv(CONFIG['FINAL_OUTPUT'], nrows=5)
    print(sample_df.to_string())
    
    print(f"\n{'='*60}")
    print("DATASET INFORMATION")
    print("="*60 + "\n")
    
    # Get column information without loading full dataset
    full_df_sample = pd.read_csv(CONFIG['FINAL_OUTPUT'], nrows=1000)
    print("\nColumn Names and Types:")
    print("-" * 60)
    for col, dtype in full_df_sample.dtypes.items():
        null_count = full_df_sample[col].isnull().sum()
        print(f"  {col:<30} {str(dtype):<15} ({null_count} nulls in sample)")
    
    print(f"\n Data verification complete!")
    
except FileNotFoundError:
    print(f"✗ Error: Output file not found at {CONFIG['FINAL_OUTPUT']}")
except Exception as e:
    print(f"✗ Error during verification: {e}")

## Pipeline Statistics and Report

In [None]:
"""
## Pipeline Execution Report
Generate comprehensive report of the pipeline execution,
including timing, success rates, and any failures.
"""

print("\n" + "="*60)
print("PIPELINE EXECUTION REPORT")
print("="*60 + "\n")

# Load checkpoint state
report_data = checkpoint.state

print("EXECUTION SUMMARY")
print("-" * 60)
print(f"Start Time:              {report_data.get('start_time', 'N/A')}")
print(f"Last Update:             {report_data.get('last_update', 'N/A')}")
print(f"Total Records Processed: {report_data.get('total_records_processed', 0):,}")
print(f"Total APIs:              {len(INSURANCE_APIS)}")
print(f"Completed APIs:          {len(report_data.get('completed_apis', []))}")
print(f"Failed APIs:             {len(report_data.get('failed_apis', []))}")

if report_data.get('completed_apis'):
    print(f"\n{'='*60}")
    print("COMPLETED APIS")
    print("="*60)
    for api in report_data['completed_apis']:
        print(f"\n  {api['name']}")
        print(f"    Records:   {api['records']:,}")
        print(f"    Completed: {api['timestamp']}")

if report_data.get('failed_apis'):
    print(f"\n{'='*60}")
    print("FAILED APIS (Check error log for details)")
    print("="*60)
    for api in report_data['failed_apis']:
        print(f"\n  {api['name']}")
        print(f"    Error:     {api['error']}")
        print(f"    Timestamp: {api['timestamp']}")

print(f"\n{'='*60}")
print("LOG FILES")
print("="*60)
print(f"  Execution Log:  {CONFIG['LOG_FILE']}")
print(f"  Error Log:      {CONFIG['ERROR_LOG']}")
print(f"  Checkpoint:     {CONFIG['CHECKPOINT_FILE']}")

## Cleanup Functions

In [None]:
"""
## Cleanup Utilities
Optional functions to clean up temporary files after successful completion.
ONLY run these after verifying your final CSV is correct!
"""

def cleanup_temp_files():
    """
    Remove all temporary chunk files after successful consolidation
    WARNING: Only run this after verifying final output!
    """
    temp_dir = Path(CONFIG['TEMP_DIR'])
    
    response = input("\n  This will DELETE all temporary files. Are you sure? (yes/no): ")
    if response.lower() != 'yes':
        print("Cleanup cancelled.")
        return
    
    try:
        json_files = list(temp_dir.glob("*.json"))
        print(f"\nFound {len(json_files)} temporary files to delete...")
        
        deleted_count = 0
        for file in json_files:
            try:
                file.unlink()
                deleted_count += 1
            except Exception as e:
                logger.error(f"Failed to delete {file}: {e}")
        
        print(f" Deleted {deleted_count} temporary files")
        
        # Optionally remove temp directory if empty
        if not any(temp_dir.iterdir()):
            temp_dir.rmdir()
            print(f" Removed empty directory: {temp_dir}")
        
    except Exception as e:
        print(f"✗ Cleanup failed: {e}")

def reset_pipeline():
    """
    Reset the entire pipeline (checkpoint and temp files)
    WARNING: This will force a complete restart!
    """
    response = input("\n  This will RESET the entire pipeline. Are you sure? (yes/no): ")
    if response.lower() != 'yes':
        print("Reset cancelled.")
        return
    
    try:
        # Reset checkpoint
        checkpoint.reset()
        print(" Checkpoint reset")
        
        # Clean temp files
        temp_dir = Path(CONFIG['TEMP_DIR'])
        for file in temp_dir.glob("*.json"):
            file.unlink()
        print(" Temporary files deleted")
        
        print("\n Pipeline reset complete. You can now run from the beginning.")
        
    except Exception as e:
        print(f"✗ Reset failed: {e}")

print("\n" + "="*60)
print("CLEANUP UTILITIES LOADED")
print("="*60)
print("\nAvailable functions:")
print("  cleanup_temp_files()  - Remove temporary JSON chunks")
print("  reset_pipeline()      - Reset checkpoint and start over")
print("\n  Only use these after verifying your final CSV!")

## Resume Pipeline (if interrupted)

In [None]:
"""
## Resume Pipeline
If the pipeline was interrupted, use this cell to resume from where it left off.
The checkpoint system will automatically start from the last completed API.
"""

async def resume_pipeline():
    """
    Resume pipeline execution from last checkpoint
    """
    resume_index = checkpoint.get_resume_index()
    
    if resume_index >= len(INSURANCE_APIS):
        print(" All APIs have been processed!")
        print("  Run the consolidation cell if you haven't already.")
        return
    
    remaining = len(INSURANCE_APIS) - resume_index
    print(f"\n{'='*60}")
    print(f"RESUMING PIPELINE")
    print(f"{'='*60}")
    print(f"  Completed:  {resume_index} APIs")
    print(f"  Remaining:  {remaining} APIs")
    print(f"  Starting from: {INSURANCE_APIS[resume_index][1]}")
    print(f"{'='*60}\n")
    
    response = input("Continue? (yes/no): ")
    if response.lower() != 'yes':
        print("Resume cancelled.")
        return
    
    # Run the pipeline (it will automatically resume)
    await run_pipeline()

print("\n" + "="*60)
print("RESUME FUNCTIONALITY")
print("="*60)
print("\nTo resume an interrupted pipeline, run:")
print("  await resume_pipeline()")
print(f"\nCurrent progress: {checkpoint.get_resume_index()}/{len(INSURANCE_APIS)} APIs completed")


## Memory Usage Monitor

In [None]:
"""
## Memory Usage Monitor
Track memory usage during execution to ensure we stay within limits.
Useful for debugging and optimization.
"""
def get_memory_usage():
    """Get current memory usage statistics"""
    process = psutil.Process(os.getpid())
    memory_info = process.memory_info()
    
    # Get system memory
    system_memory = psutil.virtual_memory()
    
    return {
        'process_mb': memory_info.rss / (1024 * 1024),
        'system_total_mb': system_memory.total / (1024 * 1024),
        'system_used_mb': system_memory.used / (1024 * 1024),
        'system_percent': system_memory.percent
    }

def print_memory_status():
    """Print current memory usage"""
    try:
        mem = get_memory_usage()
        print(f"\n{'='*60}")
        print("MEMORY USAGE")
        print("="*60)
        print(f"  Process Memory:      {mem['process_mb']:.2f} MB")
        print(f"  System Memory Used:  {mem['system_used_mb']:.2f} MB / {mem['system_total_mb']:.2f} MB")
        print(f"  System Memory:       {mem['system_percent']:.1f}% used")
        print("="*60)
    except ImportError:
        print("  psutil not installed. Run: pip install psutil")
    except Exception as e:
        print(f"✗ Error getting memory info: {e}")

# Show current memory usage
print_memory_status()

## Advanced Analytics

In [None]:
"""
## Quick Data Analytics
Perform quick analytics on the collected data without loading everything into memory.
"""

def analyze_dataset():
    """
    Perform quick analytics on the final dataset
    Uses chunked processing for memory efficiency
    """
    print("\n" + "="*60)
    print("DATASET ANALYTICS")
    print("="*60 + "\n")
    
    if not Path(CONFIG['FINAL_OUTPUT']).exists():
        print(f"✗ Dataset not found at {CONFIG['FINAL_OUTPUT']}")
        return
    
    try:
        # Initialize counters
        total_records = 0
        business_stats = {}
        
        # Process in chunks
        print("Processing data in chunks...")
        for chunk in pd.read_csv(CONFIG['FINAL_OUTPUT'], chunksize=5000):
            total_records += len(chunk)
            
            # Count by business
            for business, count in chunk['Business_Name'].value_counts().items():
                business_stats[business] = business_stats.get(business, 0) + count
        
        # Display results
        print(f"\n{'='*60}")
        print("OVERVIEW")
        print("="*60)
        print(f"  Total Reviews:       {total_records:,}")
        print(f"  Unique Businesses:   {len(business_stats)}")
        print(f"  Average per Business: {total_records / len(business_stats):.0f}")
        
        # Top and bottom performers
        sorted_businesses = sorted(business_stats.items(), key=lambda x: x[1], reverse=True)
        
        print(f"\n{'='*60}")
        print("TOP 10 BUSINESSES (Most Reviews)")
        print("="*60)
        for i, (business, count) in enumerate(sorted_businesses[:10], 1):
            pct = (count / total_records) * 100
            print(f"  {i:2d}. {business:<35} {count:>8,} ({pct:>5.2f}%)")
        
        print(f"\n{'='*60}")
        print("BOTTOM 10 BUSINESSES (Least Reviews)")
        print("="*60)
        for i, (business, count) in enumerate(sorted_businesses[-10:], 1):
            pct = (count / total_records) * 100
            print(f"  {i:2d}. {business:<35} {count:>8,} ({pct:>5.2f}%)")
        
        # File size
        file_size_mb = Path(CONFIG['FINAL_OUTPUT']).stat().st_size / (1024 * 1024)
        print(f"\n{'='*60}")
        print("FILE INFORMATION")
        print("="*60)
        print(f"  File Size:     {file_size_mb:.2f} MB")
        print(f"  Average KB per record: {(file_size_mb * 1024) / total_records:.2f} KB")
        
    except Exception as e:
        print(f"✗ Error during analysis: {e}")
        logger.error(f"Analytics failed: {e}")

print("\nAnalytics function loaded. Run with:")
print("  analyze_dataset()")

## Final Summary and Next Steps

In [None]:
"""
## Pipeline Complete! 

data collection pipeline has been successfully set up and executed.

### What Was Accomplished:
-  Collected reviews from multiple insurance company APIs
-  Implemented fault-tolerant retry logic with exponential backoff
-  Used asynchronous requests for efficient network I/O
-  Saved data incrementally to minimize memory usage
-  Created checkpoint system for resumable execution
-  Consolidated all data into a single CSV file

### Output Files:
"""

print("\n" + "="*60)
print("PIPELINE SUMMARY")
print("="*60 + "\n")

# List all output files
output_files = {
    'Final Dataset': CONFIG['FINAL_OUTPUT'],
    'Execution Log': CONFIG['LOG_FILE'],
    'Error Log': CONFIG['ERROR_LOG'],
    'Checkpoint': CONFIG['CHECKPOINT_FILE'],
    'Temp Directory': CONFIG['TEMP_DIR']
}

print("OUTPUT FILES:")
print("-" * 60)
for name, path in output_files.items():
    file_path = Path(path)
    if file_path.exists():
        if file_path.is_file():
            size_mb = file_path.stat().st_size / (1024 * 1024)
            print(f"   {name:<20} {path:<35} ({size_mb:.2f} MB)")
        else:
            file_count = len(list(file_path.glob('*')))
            print(f"   {name:<20} {path:<35} ({file_count} files)")
    else:
        print(f"  ✗ {name:<20} {path:<35} (not found)")

print(f"\n{'='*60}")
print("NEXT STEPS")
print("="*60)
print("""
1. Verify  data:
   - Check the final CSV file
   - Review the execution and error logs
   - Run analytics: analyze_dataset()

2. Clean up (optional):
   - Remove temp files: cleanup_temp_files()
   - Reset for fresh run: reset_pipeline()

3. Export to other formats (optional):
   - export_to_format('json')
   - export_to_format('parquet')

4. If interrupted:
   - Resume from checkpoint: await resume_pipeline()

5. Memory monitoring:
   - Check usage: print_memory_status()
""")

print("="*60)
print("Thank you for using the Data Collection Pipeline!")
print("="*60 + "\n")