# Data Migration Tracking v2

### Description
To track data migration status and filings done after migration

#### Install Dependencies

In [None]:
# Run this in a cell if you haven't installed these packages
!pip install pandas openpyxl sqlalchemy numpy psycopg2-binary python-dotenv


#### Core Infrastructure and Base Classes

In [None]:
import os
import logging
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text, pool
from sqlalchemy.exc import SQLAlchemyError
from typing import List, Dict, Optional, Tuple, Any, Protocol
from datetime import datetime
from contextlib import contextmanager
from abc import ABC, abstractmethod
import warnings
warnings.filterwarnings('ignore')

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class DataSource(ABC):
    """Abstract base class for all data sources"""
    
    def __init__(self, name: str, config: Dict[str, Any]):
        self.name = name
        self.config = config
        self.enabled = config.get('enabled', True)
    
    @abstractmethod
    def fetch_data(self, corp_nums: List[str], **kwargs) -> pd.DataFrame:
        """Fetch data for given corporation numbers"""
        pass
    
    @abstractmethod
    def get_column_mapping(self) -> Dict[str, str]:
        """Return mapping from source columns to standard column names"""
        pass
    
    def is_enabled(self) -> bool:
        """Check if this data source is enabled"""
        return self.enabled
    
    def get_dependencies(self) -> List[str]:
        """Return list of data sources this one depends on"""
        return self.config.get('dependencies', [])


class DatabaseManager:
    """Reusable database connection manager"""
    
    def __init__(self, db_config: Dict[str, str], pool_size: int = 5):
        self.config = db_config
        self.engine = None
        self.pool_size = pool_size
        self._setup_engine()
    
    def _setup_engine(self):
        """Setup database engine with connection pooling"""
        connection_string = (
            f"postgresql://{self.config['username']}:{self.config['password']}"
            f"@{self.config['host']}:{self.config['port']}/{self.config['database']}"
        )
        
        self.engine = create_engine(
            connection_string,
            poolclass=pool.QueuePool,
            pool_size=self.pool_size,
            max_overflow=10,
            pool_pre_ping=True,
            pool_recycle=3600
        )
    
    @contextmanager
    def get_connection(self):
        """Context manager for database connections"""
        conn = None
        try:
            conn = self.engine.connect()
            yield conn
        except SQLAlchemyError as e:
            logger.error(f"Database error: {e}")
            if conn:
                conn.rollback()
            raise
        finally:
            if conn:
                conn.close()
    
    def execute_query(self, query: str, params: Optional[Dict] = None) -> pd.DataFrame:
        """Execute query with error handling"""
        try:
            with self.get_connection() as conn:
                return pd.read_sql(query, conn, params=params)
        except Exception as e:
            logger.error(f"Query execution failed: {e}")
            return pd.DataFrame()
    
    def test_connection(self) -> bool:
        """Test database connection"""
        try:
            with self.get_connection() as conn:
                conn.execute(text("SELECT 1"))
            return True
        except Exception as e:
            logger.error(f"Connection test failed: {e}")
            return False

##################################################################################
##################################################################################

class DatabaseSource(DataSource):
    """Base class for database-based data sources"""
    
    def __init__(self, name: str, config: Dict[str, Any]):
        super().__init__(name, config)
        self.db_manager = None
        self._setup_database()
    
    def _setup_database(self):
        """Setup database connection"""
        if 'database_config' in self.config:
            self.db_manager = DatabaseManager(self.config['database_config'])
    
    def execute_query(self, query: str, params: Optional[Dict] = None) -> pd.DataFrame:
        """Execute database query"""
        if not self.db_manager:
            logger.error(f"No database manager for {self.name}")
            return pd.DataFrame()
        
        return self.db_manager.execute_query(query, params)

logger.info("Core infrastructure loaded")


#### Configuration Manager

In [None]:
class ModularConfig:
    """Configuration manager for modular data sources"""
    
    def __init__(self):
        self.load_environment()
        self.setup_data_sources()
        self.setup_constants()
    
    def load_environment(self):
        """Load environment variables"""
        from dotenv import load_dotenv
        load_dotenv()
        
        # File paths
        self.GROUP_TABLE_FOLDER = os.getenv('GROUP_TABLE_FOLDER')
        self.GROUP_TABLE_FILE_NAME = os.getenv('GROUP_TABLE_FILE_NAME')
        self.OUTPUT_FOLDER = os.getenv('OUTPUT_FOLDER')
        self.COLUMN_FOR_CORP_NUM = os.getenv('COLUMN_FOR_CORP_NUM')
        
        # Database configurations
        self.colin_extracts_config = { 
            'database': os.getenv('COLIN_EXTRACT_DB'),
            'host': os.getenv('CE_HOST_URL'),
            'username': os.getenv('CE_USERNAME'),
            'password': os.getenv('CE_PASSWORD'),
            'port': os.getenv('CE_PORT', '5432')
        }
        
        self.lear_config = {
            'database': os.getenv('LEAR_DB'),
            'host': os.getenv('LEAR_HOST_URL'),
            'username': os.getenv('LEAR_USERNAME'),
            'password': os.getenv('LEAR_PASSWORD'),
            'port': os.getenv('LEAR_PORT', '5432')
        }
    
    def setup_data_sources(self):
        """Configure data sources - easy to modify for future needs"""
        self.data_sources_config = {
            'corp_basic_info': {
                'class': 'ColinExtractsCorporationBasicSource',
                'enabled': True,
                'database_config': self.colin_extracts_config,
                'priority': 1,  # Lower number = higher priority
                'description': 'Corp type, Admin email from Colin Extracts'
            },
            'corp_names': {
                'class': 'ColinExtractsCorporationNamesSource',
                'enabled': True,
                'database_config': self.colin_extracts_config,
                'priority': 2,
                'description': 'Corporation names from Colin Extracts'
            },
            'migration_status': {
                'class': 'ColinExtractsMigrationStatusSource',
                'enabled': True,
                'database_config': self.colin_extracts_config,
                'priority': 3,
                'description': 'Migration processing status from Colin Extracts'
            },
            'lear_filings': {
                'class': 'LearFilingsSource',
                'enabled': True,
                'database_config': self.lear_config,
                'priority': 4,
                'description': 'Post-migration filings from LEAR'
            },
            # Future data sources can be easily added here:
            # 'corp_addresses': {
            #     'class': 'ColinCorporationAddressSource',
            #     'enabled': True,
            #     'database_config': self.colin_config,
            #     'priority': 5,
            #     'dependencies': ['corp_basic_info']
            #     'description': 'Corporation addresses from Colin Extracts'
            # },
            # 'directors': {
            #     'class': 'ColinDirectorsSource', 
            #     'enabled': False,  # Can be disabled easily
            #     'database_config': self.colin_config,
            #     'priority': 6,
            #     'description': 'Director information from Colin Extracts'
            # }
        }
    
    def setup_constants(self):
        """Setup application constants"""
        self.STANDARD_COLUMNS = {
            "corp_num": "Incorporation Number",
            "corp_name": "Company Name", 
            "corp_type": "Type",
            "email": "Admin Email",
            "status": "Migration Status",
            "date": "Migrated Date",
            "filings": "Filings Done",
            "filing_date": "Last Filing Date"
            # Future columns can be added here:
            # "address": "Registered Address",
            # "directors": "Current Directors"
        }
        
        self.CHUNK_SIZE = 1000
        self.PRINT_DIVIDER = "=" * 50

config = ModularConfig()
logger.info("Configuration loaded")


#### Specific Data Source Implementations

In [None]:
class ColinExtractsCorporationBasicSource(DatabaseSource):
    """Fetches basic corporation information from Colin Extracts"""
    
    def fetch_data(self, corp_nums: List[str], **kwargs) -> pd.DataFrame:
        """Fetch basic corporation data from corporation table"""
        if not corp_nums:
            return pd.DataFrame()
        
        in_str = "', '".join(corp_nums)
        query = f"""
        SELECT 
            corp_num,
            corp_type_cd,
            admin_email
        FROM public.corporation
        WHERE corp_num IN ('{in_str}')
        ORDER BY corp_num
        """
        
        logger.info(f"Fetching basic info for {len(corp_nums)} corporations from Colin Extracts")
        return self.execute_query(query)
    
    def get_column_mapping(self) -> Dict[str, str]:
        """Map source columns to standard columns"""
        return {
            'corp_num': config.STANDARD_COLUMNS['corp_num'],
            'corp_type_cd': config.STANDARD_COLUMNS['corp_type'],
            'admin_email': config.STANDARD_COLUMNS['email']
        }


class ColinExtractsCorporationNamesSource(DatabaseSource):
    """Fetches corporation names from corp_name table"""
    
    def fetch_data(self, corp_nums: List[str], **kwargs) -> pd.DataFrame:
        """Fetch corporation names"""
        if not corp_nums:
            return pd.DataFrame()
        
        in_str = "', '".join(corp_nums)
        query = f"""
        SELECT 
            corp_num,
            corp_name
        FROM public.corp_name
        WHERE corp_num IN ('{in_str}')
        AND corp_name_typ_cd IN ('CO', 'NB')
        AND end_event_id IS NULL
        ORDER BY corp_num
        """
        
        logger.info(f"Fetching names for {len(corp_nums)} corporations from Colin Extracts")
        return self.execute_query(query)
    
    def get_column_mapping(self) -> Dict[str, str]:
        """Map source columns to standard columns"""
        return {
            'corp_num': config.STANDARD_COLUMNS['corp_num'],
            'corp_name': config.STANDARD_COLUMNS['corp_name']
        }


class ColinExtractsMigrationStatusSource(DatabaseSource):
    """Fetches migration status and date from corp_processing table"""
    
    def fetch_data(self, corp_nums: List[str], **kwargs) -> pd.DataFrame:
        """Fetch migration processing status"""
        if not corp_nums:
            return pd.DataFrame()
        
        in_str = "', '".join(corp_nums)
        query = f"""
        SELECT 
            corp_num,
            processed_status,
            create_date
        FROM public.corp_processing
        WHERE corp_num IN ('{in_str}')
        ORDER BY corp_num
        """
        
        logger.info(f"Fetching migration status for {len(corp_nums)} corporations from Colin Extracts")
        df = self.execute_query(query)
        
        # Apply transformations
        if not df.empty:
            df['processed_status'] = df['processed_status'].apply(
                lambda x: 'Migrated' if x == 'COMPLETED' else 'Pending'
            )
            df['create_date'] = pd.to_datetime(df['create_date'], errors='coerce').dt.date
        
        return df
    
    def get_column_mapping(self) -> Dict[str, str]:
        """Map source columns to standard columns"""
        return {
            'corp_num': config.STANDARD_COLUMNS['corp_num'],
            'processed_status': config.STANDARD_COLUMNS['status'],
            'create_date': config.STANDARD_COLUMNS['date']
        }


class LearFilingsSource(DatabaseSource):
    """Fetches lear filings and last filing date from LEAR"""
    
    def fetch_data(self, corp_nums: List[str], **kwargs) -> pd.DataFrame:
        """Fetch LEAR filing data"""
        if not corp_nums:
            return pd.DataFrame()
        
        in_str = "', '".join(corp_nums)
        query = f"""
        WITH business_filings AS (
            SELECT 
                b.identifier,
                f.filing_date,
                f.filing_type,
                f.status,
                ROW_NUMBER() OVER (PARTITION BY b.identifier ORDER BY f.filing_date DESC) as rn
            FROM public.businesses b
            JOIN public.filings f ON b.id = f.business_id
            WHERE b.identifier IN ('{in_str}')
            AND f.source = 'LEAR'
            AND f.status = 'COMPLETED'
        ),
        aggregated_filings AS (
            SELECT 
                identifier,
                STRING_AGG(
                    DISTINCT filing_type, 
                    ', '
                ) as filings_done,
                MAX(filing_date) as last_filing_date
            FROM business_filings
            GROUP BY identifier
        )
        SELECT * FROM aggregated_filings
        """
        
        logger.info(f"Fetching LEAR filings for {len(corp_nums)} corporations")
        df = self.execute_query(query)
        
        # Apply transformations
        if not df.empty:
            df['last_filing_date'] = pd.to_datetime(df['last_filing_date'], errors='coerce').dt.date
            df['filings_done'] = df['filings_done'].apply(self._convert_filings_to_title_case)
        
        return df
    
    def get_column_mapping(self) -> Dict[str, str]:
        """Map source columns to standard columns"""
        return {
            'identifier': config.STANDARD_COLUMNS['corp_num'],
            'filings_done': config.STANDARD_COLUMNS['filings'],
            'last_filing_date': config.STANDARD_COLUMNS['filing_date']
        }
    
    def _convert_filings_to_title_case(self, filings_string: str) -> str:
        """Convert camelCase filing types to Title Case"""
        import re
        result = re.sub(r'(?<=[a-z])(?=[A-Z])', ' ', filings_string)
        return result.title()


#### Data Source Registry and Manage

In [None]:
class DataSourceRegistry:
    """Registry for managing all data sources"""
    
    def __init__(self):
        self.sources = {}
        self.source_classes = {
            'ColinExtractsCorporationBasicSource': ColinExtractsCorporationBasicSource,
            'ColinExtractsCorporationNamesSource': ColinExtractsCorporationNamesSource,
            'ColinExtractsMigrationStatusSource': ColinExtractsMigrationStatusSource,
            'LearFilingsSource': LearFilingsSource
            # Future data source classes can be registered here
        }
    
    def register_source(self, name: str, source_config: Dict[str, Any]) -> bool:
        """Register a data source"""
        try:
            class_name = source_config['class']
            if class_name not in self.source_classes:
                logger.error(f"Unknown data source class: {class_name}")
                return False
            
            source_class = self.source_classes[class_name]
            source_instance = source_class(name, source_config)
            
            self.sources[name] = source_instance
            logger.info(f"Registered data source: {name}")
            return True
            
        except Exception as e:
            logger.error(f"Failed to register data source {name}: {e}")
            return False
    
    def get_source(self, name: str) -> Optional[DataSource]:
        """Get a data source by name"""
        return self.sources.get(name)
    
    def get_enabled_sources(self) -> List[Tuple[str, DataSource]]:
        """Get all enabled data sources sorted by priority"""
        enabled_sources = [
            (name, source) for name, source in self.sources.items() 
            if source.is_enabled()
        ]
        
        # Sort by priority (lower number = higher priority)
        enabled_sources.sort(key=lambda x: x[1].config.get('priority', 999))
        return enabled_sources
    
    def validate_dependencies(self) -> bool:
        """Validate that all dependencies are met"""
        for name, source in self.sources.items():
            dependencies = source.get_dependencies()
            for dep in dependencies:
                if dep not in self.sources:
                    logger.error(f"Data source {name} depends on {dep} which is not registered")
                    return False
                if not self.sources[dep].is_enabled():
                    logger.error(f"Data source {name} depends on {dep} which is disabled")
                    return False
        return True


class DataSourceManager:
    """Manages data collection from multiple sources"""
    
    def __init__(self, config: ModularConfig):
        self.config = config
        self.registry = DataSourceRegistry()
        self._setup_sources()
    
    def _setup_sources(self):
        """Setup all configured data sources"""
        for name, source_config in self.config.data_sources_config.items():
            self.registry.register_source(name, source_config)
        
        if not self.registry.validate_dependencies():
            raise ValueError("Data source dependency validation failed")
    
    def fetch_all_data(self, corp_nums: List[str]) -> pd.DataFrame:
        """Fetch data from all enabled sources and merge"""
        if not corp_nums:
            logger.error("No corporation numbers provided")
            return pd.DataFrame()
        
        # Start with base dataframe
        main_df = pd.DataFrame({
            config.STANDARD_COLUMNS['corp_num']: corp_nums
        })
        
        enabled_sources = self.registry.get_enabled_sources()
        logger.info(f"Processing {len(enabled_sources)} enabled data sources")
        
        for source_name, source in enabled_sources:
            try:
                logger.info(f"Fetching data from: {source_name}")
                
                # Process in chunks if needed
                all_data = []
                for i in range(0, len(corp_nums), self.config.CHUNK_SIZE):
                    chunk = corp_nums[i:i + self.config.CHUNK_SIZE]
                    chunk_data = source.fetch_data(chunk)
                    if not chunk_data.empty:
                        all_data.append(chunk_data)
                
                if all_data:
                    source_df = pd.concat(all_data, ignore_index=True)
                    
                    # Apply column mapping
                    column_mapping = source.get_column_mapping()
                    source_df = source_df.rename(columns=column_mapping)
                    
                    # Merge with main dataframe
                    merge_column = config.STANDARD_COLUMNS['corp_num']
                    main_df = main_df.merge(source_df, on=merge_column, how='left')
                    
                    logger.info(f"Successfully merged data from {source_name}")
                else:
                    logger.warning(f"No data returned from {source_name}")
                    
            except Exception as e:
                logger.error(f"Failed to fetch data from {source_name}: {e}")
                # Continue with other sources even if one fails
                continue
        
        # Fill missing values for optional columns
        optional_columns = [
            config.STANDARD_COLUMNS['filings'],
            config.STANDARD_COLUMNS['filing_date']
        ]
        
        for col in optional_columns:
            if col in main_df.columns:
                main_df[col] = main_df[col].fillna('')
                if 'date' in col.lower():
                    main_df[col] = main_df[col].astype(str).replace('NaT', '')
        
        logger.info(f"Final dataset shape: {main_df.shape}")
        return main_df
    
    def add_data_source(self, name: str, source_config: Dict[str, Any]) -> bool:
        """Dynamically add a new data source"""
        return self.registry.register_source(name, source_config)
    
    def disable_data_source(self, name: str) -> bool:
        """Disable a data source"""
        source = self.registry.get_source(name)
        if source:
            source.enabled = False
            logger.info(f"Disabled data source: {name}")
            return True
        return False
    
    def enable_data_source(self, name: str) -> bool:
        """Enable a data source"""
        source = self.registry.get_source(name)
        if source:
            source.enabled = True
            logger.info(f"Enabled data source: {name}")
            return True
        return False
    
    def list_sources(self) -> Dict[str, Dict[str, Any]]:
        """List all registered data sources with their status"""
        result = {}
        for name, source in self.registry.sources.items():
            result[name] = {
                'enabled': source.is_enabled(),
                'priority': source.config.get('priority', 999),
                'description': source.config.get('description', ''),
                'dependencies': source.get_dependencies()
            }
        return result

# Initialize the data source manager
data_manager = DataSourceManager(config)

# Display registered sources
print("Registered Data Sources:")
print("=" * 50)
for name, info in data_manager.list_sources().items():
    status = "✅ ENABLED" if info['enabled'] else "❌ DISABLED"
    print(f"{name}: {status}")
    print(f"  Priority: {info['priority']}")
    print(f"  Description: {info['description']}")
    if info['dependencies']:
        print(f"  Dependencies: {', '.join(info['dependencies'])}")
    print()


#### Main Execution Engine

In [None]:
class MigrationTrackingEngine:
    """Main engine for the migration tracking system"""
    
    def __init__(self, config: ModularConfig, data_manager: DataSourceManager):
        self.config = config
        self.data_manager = data_manager
    
    def load_corporation_numbers(self) -> List[str]:
        """Load corporation numbers from Excel file"""
        try:
            file_path = f"{self.config.GROUP_TABLE_FOLDER}/{self.config.GROUP_TABLE_FILE_NAME}"
            logger.info(f"Loading corporation numbers from: {file_path}")
            
            df = pd.read_excel(
                file_path, 
                sheet_name="Sheet1", 
                usecols=[self.config.COLUMN_FOR_CORP_NUM]
            )
            
            corp_nums = df[self.config.COLUMN_FOR_CORP_NUM].dropna().astype(str).tolist()
            logger.info(f"Loaded {len(corp_nums)} corporation numbers")
            return corp_nums
            
        except Exception as e:
            logger.error(f"Failed to load corporation numbers: {e}")
            return []
    
    def run_full_process(self) -> pd.DataFrame:
        """Run the complete migration tracking process"""
        try:
            logger.info("Starting modular migration tracking process")
            logger.info(self.config.PRINT_DIVIDER)
            start_time = datetime.now()
            
            # Step 1: Load corporation numbers
            corp_nums = self.load_corporation_numbers()
            if not corp_nums:
                logger.error("No corporation numbers loaded")
                return pd.DataFrame()
            
            # Step 2: Fetch all data using modular sources
            logger.info("Fetching data from all configured sources...")
            result_df = self.data_manager.fetch_all_data(corp_nums)
            
            if result_df.empty:
                logger.error("No data retrieved")
                return pd.DataFrame()
            
            # Step 3: Export results
            output_file = self._export_results(result_df)
            
            # Step 4: Generate summary
            end_time = datetime.now()
            duration = end_time - start_time
            
            self._print_summary(result_df, output_file, duration)
            
            return result_df
            
        except Exception as e:
            logger.error(f"Process execution failed: {e}")
            raise
    
    def _export_results(self, df: pd.DataFrame) -> str:
        """Export results to Excel"""
        from openpyxl.styles import Font, Alignment
        
        # Prepare filename
        output_path = f"{self.config.OUTPUT_FOLDER}/migration_tracking.xlsx"
        final_path = self._generate_unique_filename(output_path)
        
        # Export with formatting
        with pd.ExcelWriter(final_path, engine='openpyxl') as writer:
            df.to_excel(writer, sheet_name='Migration Tracking', index=False)
            
            worksheet = writer.sheets['Migration Tracking']

            # Freeze the first row
            worksheet.freeze_panes = 'A2'
            
            # Format header
            for cell in worksheet[1]:
                cell.font = Font(bold=True)
            
            # Adjust column widths
            for column in worksheet.columns:
                max_length = 0
                column_letter = column[0].column_letter
                
                for cell in column:
                    try:
                        cell_length = len(str(cell.value))
                        if cell_length > max_length:
                            max_length = cell_length
                    except:
                        pass
                
                adjusted_width = min(max_length + 2, 50)
                worksheet.column_dimensions[column_letter].width = adjusted_width
        
        logger.info(f"Results exported to: {final_path}")
        return final_path
    
    def _generate_unique_filename(self, original_path: str) -> str:
        """Generate unique filename with timestamp"""
        directory = os.path.dirname(original_path)
        filename = os.path.basename(original_path)
        name, ext = os.path.splitext(filename)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        new_filename = f"{name}_{timestamp}{ext}"
        return os.path.join(directory, new_filename)
    
    def _print_summary(self, df: pd.DataFrame, output_file: str, duration):
        """Print process summary"""
        print(f"\n{self.config.PRINT_DIVIDER}")
        print("MIGRATION TRACKING COMPLETED")
        print(f"{self.config.PRINT_DIVIDER}")
        
        # Basic stats
        total_corps = len(df)
        migrated_corps = len(df[df[self.config.STANDARD_COLUMNS['status']] == 'Migrated']) if self.config.STANDARD_COLUMNS['status'] in df.columns else 0
        corps_with_filings = len(df[df[self.config.STANDARD_COLUMNS['filings']] != '']) if self.config.STANDARD_COLUMNS['filings'] in df.columns else 0
        
        print(f"Results Summary:")
        print(f"   Total Corporations: {total_corps}")
        print(f"   Migrated: {migrated_corps}")
        print(f"   With Filings: {corps_with_filings}")
        print(f"   Processing Time: {duration}")
        print(f"   Output File: {output_file}")
        
        # Data sources used
        print(f"\nData Sources Used:")
        enabled_sources = self.data_manager.registry.get_enabled_sources()
        for name, source in enabled_sources:
            print(f"   ✅ {name}: {source.config.get('description', 'No description')}")
        
        print(f"\n{self.config.PRINT_DIVIDER}")

# Initialize the engine
engine = MigrationTrackingEngine(config, data_manager)


### Main Execution

In [None]:
def main_execution(mode: str) -> None:
    """The main execution of migration tracking"""
    if not mode or mode not in ['prod', 'test']:
         print("Invalid mode, current available modes: prod, test")
    try:
            # Run the complete process
            result_df = engine.run_full_process()
            
            if not result_df.empty:
                if mode == 'prod':
                    print("\nSample Results:")
                    display(result_df.head(10))
                elif mode == 'test':
                    print("\nTest mode, display full dataframe:")
                    with pd.option_context("display.max_rows", None):
                        display(result_df)

    except Exception as e:
        logger.error(f"Main execution failed: {e}")

#############################################################
main_execution('test')
