In [68]:
import sys

# !{sys.executable} -m pip install pyodbc pandas 
# !{sys.executable} -m pip install texttable

In [7]:
import pyodbc
import sqlite3
import os
from datetime import datetime

class AccessToSQLiteConverter:
    def __init__(self, access_db_path, sqlite_db_path):
        self.access_db_path = access_db_path
        self.sqlite_db_path = sqlite_db_path

    def safe_decode(self, value):
        """
        Safely decodes problematic strings from Access database
        with multiple fallback strategies.
        """
        if value is None:
            return None
        if isinstance(value, str):
            return value
        if isinstance(value, bytes):
            try:
                # First try UTF-16-LE (Access default)
                return value.decode('utf-16-le')
            except UnicodeDecodeError:
                try:
                    # Fallback to UTF-8 with replacement
                    return value.decode('utf-8', errors='replace')
                except UnicodeDecodeError:
                    try:
                        # Try Latin-1 as last resort
                        return value.decode('latin-1', errors='replace')
                    except:
                        # Final fallback - return as escaped string
                        return str(value)[2:-1]  # Remove b'...' wrapper
        return str(value)

    def get_access_schema(self, access_conn, table_name):
        """Extract schema (columns, types, constraints) from Access table."""
        cursor = access_conn.cursor()
        columns = []
        auto_number_cols = []
        
        # Get column info
        for col in cursor.columns(table=table_name):
            col_name = col.column_name
            col_type = col.type_name
            is_nullable = col.is_nullable == 'YES'
            # Map Access types to SQLite
            if col_type in ['INTEGER', 'LONG']:
                sqlite_type = 'INTEGER'
            elif col_type in ['VARCHAR', 'TEXT', 'MEMO']:
                sqlite_type = 'TEXT'
            elif col_type == 'DATETIME':
                sqlite_type = 'TEXT'  # SQLite stores dates as TEXT
            elif col_type in ['DOUBLE', 'SINGLE']:
                sqlite_type = 'REAL'
            else:
                sqlite_type = 'TEXT'  # Fallback
            
            # Check if AutoNumber
            is_auto_number = False
            if col.remarks and 'AUTOINCREMENT' in col.remarks.upper():
                is_auto_number = True
                auto_number_cols.append(col_name)
            
            columns.append({
                'name': col_name,
                'type': sqlite_type,
                'nullable': is_nullable,
                'auto_number': is_auto_number
            })
        
        # Get primary keys
        primary_keys = []
        try:
            for row in cursor.statistics(table=table_name, type='INDEX'):
                if row[5] == 'PRIMARY':
                    primary_keys.append(row[8])
        except pyodbc.Error:
            pass
        
        # Get foreign keys
        foreign_keys = []
        try:
            for row in cursor.foreignKeys(table=table_name):
                fk = {
                    'column': row[7],
                    'ref_table': row[2],
                    'ref_column': row[3]
                }
                foreign_keys.append(fk)
        except pyodbc.Error:
            pass
        
        return columns, primary_keys, foreign_keys, auto_number_cols

    def create_sqlite_table(self, sqlite_cursor, table_name, columns, primary_keys, foreign_keys, auto_number_cols):
        """Create SQLite table with schema matching Access."""
        col_defs = []
        for col in columns:
            col_def = f'"{col["name"]}" {col["type"]}'
            if col['name'] in auto_number_cols:
                col_def += ' PRIMARY KEY AUTOINCREMENT'
            elif col['name'] in primary_keys and col['name'] not in auto_number_cols:
                col_def += ' PRIMARY KEY'
            if not col['nullable']:
                col_def += ' NOT NULL'
            col_defs.append(col_def)
        
        for fk in foreign_keys:
            fk_def = f'FOREIGN KEY ("{fk["column"]}") REFERENCES "{fk["ref_table"]}" ("{fk["ref_column"]}")'
            col_defs.append(fk_def)
        
        create_sql = f'CREATE TABLE "{table_name}" ({", ".join(col_defs)});'
        sqlite_cursor.execute(create_sql)

    def transfer_data(self, access_conn, sqlite_conn, table_name, columns):
        """Transfer data from Access to SQLite, applying safe_decode to text."""
        access_cursor = access_conn.cursor()
        sqlite_cursor = sqlite_conn.cursor()
        
        # Escape square brackets in f-string
        col_names = [f'[[]{col["name"]}[]]' for col in columns]
        select_sql = f'SELECT {", ".join(col_names)} FROM [{table_name}]'
        access_cursor.execute(select_sql)
        
        # # Simplified f-string for INSERT
        # placeholders = ','.join(['?' for _ in columns])
        # insert_sql = f'INSERT INTO "{table_name}" ({", ".join(f'"{col["name"]}"' for col in columns)}) VALUES ({placeholders})'
        insert_sql = f"""INSERT INTO "{table_name}" ({', '.join(f'"{col["name"]}"' for col in columns)}) VALUES ({','.join(['?' for _ in columns])})"""
        
        for row in access_cursor:
            decoded_row = [
                self.safe_decode(val) if col['type'] == 'TEXT' else val
                for val, col in zip(row, columns)
            ]
            sqlite_cursor.execute(insert_sql, decoded_row)
        
        sqlite_conn.commit()

    def convert(self):
        """Main conversion process."""
        try:
            # Connect to Access
            access_conn_str = (
                r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};'
                f'DBQ={self.access_db_path};'
            )
            access_conn = pyodbc.connect(access_conn_str)
            access_cursor = access_conn.cursor()
            
            # Connect to SQLite
            os.makedirs(os.path.dirname(self.sqlite_db_path), exist_ok=True)
            sqlite_conn = sqlite3.connect(self.sqlite_db_path)
            sqlite_cursor = sqlite_conn.cursor()
            
            # Enable WAL mode
            sqlite_cursor.execute('PRAGMA journal_mode=WAL;')
            if sqlite_cursor.fetchone()[0] != 'wal':
                raise Exception('Failed to enable WAL mode')
            
            # Set auto-checkpoint
            sqlite_cursor.execute('PRAGMA wal_autocheckpoint=100;')
            
            # Enable incremental auto-vacuum
            sqlite_cursor.execute('PRAGMA auto_vacuum=2;')
            
            # Enable foreign keys
            sqlite_cursor.execute('PRAGMA foreign_keys=ON;')
            
            # Get tables
            tables = [row.table_name for row in access_cursor.tables() if row.table_type == 'TABLE']
            
            # Process tables
            for table in tables:
                print(f'Processing table: {table}')
                
                columns, primary_keys, foreign_keys, auto_number_cols = self.get_access_schema(access_conn, table)
                self.create_sqlite_table(sqlite_cursor, table, columns, primary_keys, foreign_keys, auto_number_cols)
                self.transfer_data(access_conn, sqlite_conn, table, columns)
                
                # Update auto-increment sequence
                for col in auto_number_cols:
                    sqlite_cursor.execute(f'SELECT MAX("{col}") FROM "{table}"')
                    max_id = sqlite_cursor.fetchone()[0] or 0
                    sqlite_cursor.execute(
                        f'INSERT OR REPLACE INTO sqlite_sequence (name, seq) VALUES (?, ?)',
                        (table, max_id)
                    )
            
            # Perform full checkpoint
            sqlite_cursor.execute('PRAGMA wal_checkpoint(FULL);')
            
            # Verify integrity
            sqlite_cursor.execute('PRAGMA integrity_check;')
            if sqlite_cursor.fetchone()[0] != 'ok':
                raise Exception('SQLite database integrity check failed')
            
            sqlite_conn.commit()
            print(f'Migration completed successfully. SQLite database saved at: {self.sqlite_db_path}')
            
        except Exception as e:
            print(f'Error during migration: {str(e)}')
            if 'sqlite_conn' in locals():
                sqlite_conn.rollback()
        
        finally:
            if 'access_conn' in locals():
                access_conn.close()
            if 'sqlite_conn' in locals():
                sqlite_conn.close()

# def main():
#     access_db_path = r'C:\Database\myapp_be.accdb'  # Update path
#     sqlite_db_path = r'C:\Database\myapp.db'  # Update path (local SSD)
#     converter = AccessToSQLiteConverter(access_db_path, sqlite_db_path)
#     converter.convert()

# if __name__ == '__main__':
#     main()

In [9]:
access_db = r'C:\tmp\access_to_sqlite\Database1_be.accdb'  # Update path
sqlite_db = r'C:\tmp\access_to_sqlite\northwind_be.db'  # Update path

converter = AccessToSQLiteConverter(access_db, sqlite_db)
converter.convert()

Processing table: Customers
Error during migration: 'type' is an invalid keyword argument for this function


In [70]:
# Usage Examples
# Basic migration:

# bash
# python migrate_access_to_sqlite.py input.mdb output.sqlite
# Excluding specific tables:

# bash
# python migrate_access_to_sqlite.py input.accdb output.sqlite --exclude Table1 Table2

In [71]:
start_time = datetime.now()
logging.info(f"Starting migration at {start_time}")

access_db = r'C:\tmp\access_to_sqlite\Database1_be.accdb'  # Update path
sqlite_db = r'C:\tmp\access_to_sqlite\northwind_be.db'  # Update path
# exclude = ['Table1', 'Table2']  # Example tables to exclude
exclude = []  # No tables excluded in this example

#if sqlite_db exists, then make a backup of it
if os.path.exists(sqlite_db):
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    backup_path = f"{sqlite_db}.{timestamp}.bak"
    logging.info(f"SQLite database already exists. Renaming to {backup_path}")
    os.rename(sqlite_db, backup_path)

with AccessDBToSQLiteMigrator(access_db, sqlite_db) as migrator:
    migrator.transfer_all_tables(exclude_tables=exclude)

end_time = datetime.now()
duration = end_time - start_time
logging.info(f"Migration completed successfully at {end_time}")
logging.info(f"Total duration: {duration}")

2025-05-26 20:30:18,167 - INFO - Starting migration at 2025-05-26 20:30:18.167378
2025-05-26 20:30:18,169 - INFO - SQLite database already exists. Renaming to C:\tmp\access_to_sqlite\northwind_be.db.20250526_203018.bak
2025-05-26 20:30:18,172 - INFO - Initialized migrator with Access DB: C:\tmp\access_to_sqlite\Database1_be.accdb
2025-05-26 20:30:18,172 - INFO - SQLite DB will be created at: C:\tmp\access_to_sqlite\northwind_be.db
2025-05-26 20:30:18,287 - INFO - Successfully connected to Access database
2025-05-26 20:30:18,292 - INFO - Successfully connected to SQLite database
2025-05-26 20:30:18,295 - INFO - Found 9 tables in Access database
2025-05-26 20:30:18,297 - INFO - Processing table: Customers
2025-05-26 20:30:18,301 - INFO - Created table 'Customers' in SQLite
2025-05-26 20:30:18,306 - INFO - Transferred 8 rows to 'Customers' (total: 8)
2025-05-26 20:30:18,307 - INFO - Completed transfer for table 'Customers'. Total rows: 8
2025-05-26 20:30:18,308 - INFO - Processing table: 

In [72]:
import sqlite3
import pyodbc
import pandas as pd
import matplotlib.pyplot as plt
from texttable import Texttable
from datetime import datetime
import logging
import os

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('db_reconciliation.log'),
        logging.StreamHandler()
    ]
)

class DatabaseReconciler:
    def __init__(self, access_db_path: str, sqlite_db_path: str):
        self.access_db_path = os.path.abspath(access_db_path)
        self.sqlite_db_path = os.path.abspath(sqlite_db_path)
        self.access_conn = None
        self.sqlite_conn = None
        self.report_data = []
        
        # Verify databases exist
        if not os.path.exists(self.access_db_path):
            raise FileNotFoundError(f"Access database not found: {self.access_db_path}")
        if not os.path.exists(self.sqlite_db_path):
            raise FileNotFoundError(f"SQLite database not found: {self.sqlite_db_path}")

    def connect_databases(self):
        """Establish connections to both databases"""
        try:
            # Connect to Access
            driver = 'Microsoft Access Driver (*.mdb, *.accdb)'
            conn_str = f'DRIVER={{{driver}}};DBQ={self.access_db_path};'
            self.access_conn = pyodbc.connect(conn_str)
            
            # Connect to SQLite
            self.sqlite_conn = sqlite3.connect(self.sqlite_db_path)
            
            logging.info("Successfully connected to both databases")
        except Exception as e:
            logging.error(f"Failed to connect to databases: {e}")
            raise

    def get_table_names(self, db_type: str) -> list:
        """Get list of tables in specified database"""
        if db_type == 'access':
            cursor = self.access_conn.cursor()
            tables = cursor.tables(tableType='TABLE')
            return [table.table_name for table in tables]
        elif db_type == 'sqlite':
            cursor = self.sqlite_conn.cursor()
            cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
            return [row[0] for row in cursor.fetchall()]
        else:
            raise ValueError("Invalid database type specified")

    def get_row_count(self, db_type: str, table_name: str) -> int:
        """Get row count for a specific table"""
        try:
            if db_type == 'access':
                cursor = self.access_conn.cursor()
                cursor.execute(f'SELECT COUNT(*) FROM "{table_name}"')
                return cursor.fetchone()[0]
            elif db_type == 'sqlite':
                cursor = self.sqlite_conn.cursor()
                cursor.execute(f'SELECT COUNT(*) FROM "{table_name}"')
                return cursor.fetchone()[0]
        except Exception as e:
            logging.warning(f"Error counting rows in {table_name}: {e}")
            return -1

    def compare_schemas(self, table_name: str) -> dict:
        """Compare table schemas between Access and SQLite"""
        result = {
            'table': table_name,
            'columns_match': False,
            'missing_columns': [],
            'type_mismatches': []
        }
        
        try:
            # Get Access schema
            access_cursor = self.access_conn.cursor()
            access_columns = access_cursor.columns(table=table_name)
            access_schema = {col.column_name: col.type_name for col in access_columns}
            
            # Get SQLite schema
            sqlite_cursor = self.sqlite_conn.cursor()
            sqlite_cursor.execute(f"PRAGMA table_info('{table_name}')")
            sqlite_schema = {row[1]: row[2] for row in sqlite_cursor.fetchall()}
            
            # Compare
            missing_columns = set(access_schema.keys()) - set(sqlite_schema.keys())
            if missing_columns:
                result['missing_columns'] = list(missing_columns)
            
            type_mismatches = []
            for col in access_schema:
                if col in sqlite_schema:
                    # Simple type comparison - you might want to enhance this
                    if access_schema[col].upper() != sqlite_schema[col].upper():
                        type_mismatches.append({
                            'column': col,
                            'access_type': access_schema[col],
                            'sqlite_type': sqlite_schema[col]
                        })
            
            result['type_mismatches'] = type_mismatches
            result['columns_match'] = not (missing_columns or type_mismatches)
            
        except Exception as e:
            logging.warning(f"Error comparing schemas for {table_name}: {e}")
            result['error'] = str(e)
        
        return result

    def sample_and_compare_data(self, table_name: str, sample_size: int = 10) -> dict:
        """Compare sample data between databases with Access-compatible sampling"""
        result = {
            'table': table_name,
            'sample_matches': 0,
            'sample_mismatches': 0,
            'sample_errors': 0,
            'sample_results': []
        }
        
        try:
            # Get column names
            access_cursor = self.access_conn.cursor()
            access_cursor.execute(f'SELECT * FROM "{table_name}" WHERE 1=0')
            columns = [column[0] for column in access_cursor.description]
            
            # Get total row count for sampling
            total_rows = self.get_row_count('access', table_name)
            if total_rows <= 0:
                logging.warning(f"Table {table_name} is empty or inaccessible")
                return result
            
            # Get sample from Access using TOP instead of LIMIT
            if total_rows <= sample_size:
                # Small table - take all rows
                access_cursor.execute(f'SELECT TOP {sample_size} * FROM "{table_name}"')
            else:
                # Use RND with record number for sampling
                access_cursor.execute(f'''
                    SELECT TOP {sample_size} * FROM "{table_name}" 
                    ORDER BY RND(CLNG([{columns[0]}]))
                ''')
            
            access_sample = access_cursor.fetchall()
            
            # Get corresponding rows from SQLite
            sqlite_cursor = self.sqlite_conn.cursor()
            
            for i, access_row in enumerate(access_sample):
                try:
                    # Build WHERE clause using first 3 columns that have values
                    where_parts = []
                    params = []
                    used_cols = 0
                    
                    for j in range(len(columns)):
                        if access_row[j] is not None:  # Only use non-NULL columns for matching
                            where_parts.append(f'"{columns[j]}" = ?')
                            params.append(access_row[j])
                            used_cols += 1
                            if used_cols >= 3:  # Use up to 3 columns for matching
                                break
                    
                    if not where_parts:
                        result['sample_errors'] += 1
                        result['sample_results'].append({
                            'sample_id': i+1,
                            'error': "No non-NULL columns found for matching"
                        })
                        continue
                    
                    where_clause = ' AND '.join(where_parts)
                    sqlite_cursor.execute(
                        f'SELECT * FROM "{table_name}" WHERE {where_clause}',
                        params
                    )
                    sqlite_row = sqlite_cursor.fetchone()
                    
                    # Compare rows
                    if sqlite_row is None:
                        match = False
                        diff = "Row not found in SQLite"
                    else:
                        match = (tuple(access_row) == tuple(sqlite_row))
                        if not match:
                            diff = {
                                'column': columns,
                                'access_values': access_row,
                                'sqlite_values': sqlite_row
                            }
                        else:
                            diff = None
                    
                    result['sample_results'].append({
                        'sample_id': i+1,
                        'match': match,
                        'diff': diff if not match else None
                    })
                    
                    if match:
                        result['sample_matches'] += 1
                    else:
                        result['sample_mismatches'] += 1
                
                except Exception as e:
                    result['sample_errors'] += 1
                    result['sample_results'].append({
                        'sample_id': i+1,
                        'error': str(e)
                    })
                    logging.warning(f"Error comparing sample {i+1} in {table_name}: {e}")
        
        except Exception as e:
            logging.warning(f"Error sampling data from {table_name}: {e}")
            result['error'] = str(e)
        
        return result

    def generate_reconciliation_report(self):
        """Generate comprehensive reconciliation report"""
        self.report_data = []
        access_tables = self.get_table_names('access')
        sqlite_tables = self.get_table_names('sqlite')
        
        # Check for missing tables
        missing_tables = set(access_tables) - set(sqlite_tables)
        extra_tables = set(sqlite_tables) - set(access_tables)
        
        for table in access_tables:
            if table in missing_tables:
                self.report_data.append({
                    'table': table,
                    'status': 'MISSING',
                    'access_rows': self.get_row_count('access', table),
                    'sqlite_rows': 0,
                    'schema_match': False,
                    'sample_match_rate': 0
                })
                continue
                
            # Get row counts
            access_rows = self.get_row_count('access', table)
            sqlite_rows = self.get_row_count('sqlite', table)
            
            # Compare schemas
            schema_result = self.compare_schemas(table)
            
            # Sample data comparison
            sample_result = self.sample_and_compare_data(table)
            sample_match_rate = (
                sample_result['sample_matches'] / 
                (sample_result['sample_matches'] + sample_result['sample_mismatches'])
                if (sample_result['sample_matches'] + sample_result['sample_mismatches']) > 0 
                else 0
            )
            
            self.report_data.append({
                'table': table,
                'status': 'COMPLETE' if access_rows == sqlite_rows else 'INCOMPLETE',
                'access_rows': access_rows,
                'sqlite_rows': sqlite_rows,
                'schema_match': schema_result['columns_match'],
                'sample_match_rate': sample_match_rate,
                'missing_columns': schema_result['missing_columns'],
                'type_mismatches': schema_result['type_mismatches'],
                'sample_results': sample_result['sample_results']
            })
        
        # Add extra tables found in SQLite
        for table in extra_tables:
            self.report_data.append({
                'table': table,
                'status': 'EXTRA',
                'access_rows': 0,
                'sqlite_rows': self.get_row_count('sqlite', table),
                'schema_match': False,
                'sample_match_rate': 0
            })

    def display_text_report(self):
        """Display formatted text report"""
        table = Texttable()
        table.set_deco(Texttable.HEADER)
        table.set_cols_align(["l", "l", "r", "r", "l", "r"])
        table.set_cols_width([20, 12, 12, 12, 8, 12])
        
        # Header
        table.add_row([
            "Table", 
            "Status", 
            "Access Rows", 
            "SQLite Rows", 
            "Schema", 
            "Sample Match"
        ])
        
        # Data
        for item in self.report_data:
            table.add_row([
                item['table'],
                item['status'],
                item['access_rows'],
                item['sqlite_rows'],
                "OK" if item.get('schema_match', False) else "DIFF",
                f"{item.get('sample_match_rate', 0)*100:.1f}%"
            ])
        
        print("\nDatabase Reconciliation Summary:")
        print(table.draw())
        
        # Print details for problematic tables
        print("\nDetailed Findings:")
        for item in self.report_data:
            if item['status'] != 'COMPLETE' or not item['schema_match'] or item.get('sample_match_rate', 1) < 1:
                print(f"\nTable: {item['table']}")
                print(f"Status: {item['status']}")
                print(f"Row counts: Access={item['access_rows']}, SQLite={item['sqlite_rows']}")
                
                if item.get('missing_columns'):
                    print(f"Missing columns: {', '.join(item['missing_columns'])}")
                
                if item.get('type_mismatches'):
                    print("Type mismatches:")
                    for mismatch in item['type_mismatches']:
                        print(f"  {mismatch['column']}: Access={mismatch['access_type']}, SQLite={mismatch['sqlite_type']}")
                
                if item.get('sample_match_rate', 1) < 1:
                    print(f"Sample match rate: {item['sample_match_rate']*100:.1f}%")
                    for sample in item.get('sample_results', []):
                        if not sample.get('match', True):
                            print(f"  Sample {sample['sample_id']} mismatch:")
                            if 'diff' in sample and isinstance(sample['diff'], dict):
                                for col, acc_val, sql_val in zip(
                                    sample['diff']['column'],
                                    sample['diff']['access_values'],
                                    sample['diff']['sqlite_values']
                                ):
                                    if acc_val != sql_val:
                                        print(f"    {col}: Access={acc_val}, SQLite={sql_val}")

    def generate_visualizations(self, output_dir='reports'):
        """Generate visualizations of reconciliation results"""
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        # Create DataFrame for visualization
        df = pd.DataFrame(self.report_data)
        
        # 1. Migration Completeness Chart
        plt.figure(figsize=(10, 6))
        status_counts = df['status'].value_counts()
        plt.bar(status_counts.index, status_counts.values)
        plt.title('Table Migration Status Distribution')
        plt.ylabel('Number of Tables')
        plt.savefig(os.path.join(output_dir, 'migration_status.png'))
        plt.close()
        
        # 2. Row Count Comparison Chart
        plt.figure(figsize=(12, 8))
        df_filtered = df[df['status'].isin(['COMPLETE', 'INCOMPLETE'])]
        df_filtered = df_filtered.sort_values('access_rows', ascending=False)
        
        x = range(len(df_filtered))
        width = 0.35
        
        plt.bar(x, df_filtered['access_rows'], width, label='Access')
        plt.bar([i + width for i in x], df_filtered['sqlite_rows'], width, label='SQLite')
        
        plt.xlabel('Tables')
        plt.ylabel('Row Counts')
        plt.title('Row Count Comparison (Access vs SQLite)')
        plt.xticks([i + width/2 for i in x], df_filtered['table'], rotation=90)
        plt.legend()
        plt.tight_layout()
        plt.savefig(os.path.join(output_dir, 'row_count_comparison.png'))
        plt.close()
        
        # 3. Data Match Quality Heatmap
        plt.figure(figsize=(12, 6))
        df_filtered = df[~df['status'].isin(['MISSING', 'EXTRA'])]
        
        # Create match quality score (0-100)
        df_filtered['quality_score'] = (
            (df_filtered['schema_match'].astype(int) * 50) +
            (df_filtered['sample_match_rate'] * 50)
        )
        df_filtered = df_filtered.sort_values('quality_score')
        
        plt.barh(df_filtered['table'], df_filtered['quality_score'], color='skyblue')
        plt.xlabel('Data Quality Score (0-100)')
        plt.title('Migration Data Quality by Table')
        plt.xlim(0, 100)
        plt.grid(axis='x', alpha=0.3)
        plt.tight_layout()
        plt.savefig(os.path.join(output_dir, 'data_quality.png'))
        plt.close()

    def close_connections(self):
        """Close database connections"""
        if self.access_conn:
            self.access_conn.close()
        if self.sqlite_conn:
            self.sqlite_conn.close()

# def main():
#     import argparse
    
#     parser = argparse.ArgumentParser(description='Reconcile Access and SQLite databases')
#     parser.add_argument('access_db', help='Path to Access database file')
#     parser.add_argument('sqlite_db', help='Path to SQLite database file')
#     parser.add_argument('--output', help='Output directory for reports', default='reports')
    
#     args = parser.parse_args()
    
#     reconciler = DatabaseReconciler(args.access_db, args.sqlite_db)
    
#     try:
#         reconciler.connect_databases()
#         reconciler.generate_reconciliation_report()
#         reconciler.display_text_report()
#         reconciler.generate_visualizations(args.output)
        
#         print(f"\nReports and visualizations saved to: {args.output}")
#     finally:
#         reconciler.close_connections()

# if __name__ == '__main__':
#     main()

In [73]:
from datetime import datetime

start_time = datetime.now()
logging.info(f"Starting db recon at {start_time}")

access_db = r'C:\tmp\access_to_sqlite\Database1_be.accdb'  # Update path
sqlite_db = r'C:\tmp\access_to_sqlite\northwind_be.db'  # Update path
# exclude = ['Table1', 'Table2']  # Example tables to exclude
exclude = []  # No tables excluded in this example

output = r'C:\tmp\access_to_sqlite\reports'  # Directory for reports
reconciler = DatabaseReconciler(access_db, sqlite_db)

try:
    reconciler.connect_databases()
    reconciler.generate_reconciliation_report()
    reconciler.display_text_report()
    reconciler.generate_visualizations(output)
    
    print(f"\nReports and visualizations saved to: {output}")
finally:
    reconciler.close_connections()

end_time = datetime.now()
duration = end_time - start_time
logging.info(f"Recon completed successfully at {end_time}")
logging.info(f"Total duration: {duration}")

2025-05-26 20:30:18,530 - INFO - Starting db recon at 2025-05-26 20:30:18.530028
2025-05-26 20:30:18,652 - INFO - Successfully connected to both databases



Database Reconciliation Summary:
Table                  Status          Access Rows    SQLite Rows   Schema     Sample Match
Customers              COMPLETE                  8              8   DIFF               0.0%
Employees              COMPLETE                 11             11   DIFF               0.0%
NorthwindFeatures      COMPLETE                 34             34   DIFF             100.0%
OrderDetails           COMPLETE                 91             91   DIFF               0.0%
Orders                 COMPLETE                 39             39   DIFF               0.0%
OrderStatus            COMPLETE                  5              5   DIFF               0.0%
Products               COMPLETE                 43             43   DIFF               0.0%
SystemSettings         COMPLETE                  4              4   DIFF             100.0%
Welcome                MISSING                   1              0   DIFF               0.0%

Detailed Findings:

Table: Customers
Status: 

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_filtered['quality_score'] = (
2025-05-26 20:30:19,141 - INFO - Recon completed successfully at 2025-05-26 20:30:19.141613
2025-05-26 20:30:19,142 - INFO - Total duration: 0:00:00.611585



Reports and visualizations saved to: C:\tmp\access_to_sqlite\reports
