In [4]:
import os
import json
import re
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime
from dataclasses import dataclass, asdict
import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment
from sqlalchemy import create_engine, text
import openai
from openai import OpenAI
import docx2txt
import PyPDF2
from dateutil import parser as date_parser

In [13]:
# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('ai_financial_extractor')


In [15]:
# Configuration
CONFIG = {
    "database": {
        "type": "postgresql",
        "host": "localhost",
        "port": 5432,
        "database": "financial_data",
        "user": "postgres",
        "password": "SenkoSQL"
    },
    "openai": {
        "api_key": "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",
        "model": "gpt-4o", 
        "temperature": 0.1  
    },
    "output": {
        "excel_file": "ai_extracted_financial_data.xlsx"
    }
}

In [17]:
@dataclass
class FinancialRecord:
    """Data class representing a financial investment record"""
    as_of_date: Optional[str] = None
    original_security_name: Optional[str] = None
    investment_in_original: Optional[float] = None
    investment_in: Optional[float] = None
    investment_in_prior: Optional[float] = None
    currency: Optional[str] = None
    # Additional fields
    sector: Optional[str] = None
    risk_rating: Optional[str] = None
    maturity_date: Optional[str] = None
    yield_percentage: Optional[float] = None
    isin: Optional[str] = None
    cusip: Optional[str] = None
    asset_class: Optional[str] = None
    country: Optional[str] = None
    region: Optional[str] = None

In [19]:
class AIDocumentExtractor:
    """Extract financial data from documents using OpenAI's API"""
    
    def __init__(self, openai_api_key: str, model: str = "gpt-4o"):
        self.client = OpenAI(api_key=openai_api_key)
        self.model = model
        self.logger = logging.getLogger(f'{__name__}.AIDocumentExtractor')
        
    def extract_text_from_file(self, file_path: str) -> str:
        """Extract text from various file formats"""
        file_extension = os.path.splitext(file_path)[1].lower()
        
        try:
            if file_extension == ".docx":
                return docx2txt.process(file_path)
            elif file_extension == ".pdf":
                text = ""
                with open(file_path, "rb") as file:
                    pdf_reader = PyPDF2.PdfReader(file)
                    for page in pdf_reader.pages:
                        text += page.extract_text()
                return text
            elif file_extension == ".txt":
                with open(file_path, "r", encoding="utf-8", errors="replace") as file:
                    return file.read()
            elif file_extension == ".csv":
                # For CSV, we'll convert to a readable format for the AI
                df = pd.read_csv(file_path)
                return df.to_string()
            elif file_extension == ".json":
                with open(file_path, "r", encoding="utf-8") as file:
                    data = json.load(file)
                    return json.dumps(data, indent=2)
            else:
                raise ValueError(f"Unsupported file format: {file_extension}")
        except Exception as e:
            self.logger.error(f"Error extracting text from {file_path}: {str(e)}")
            raise
    
    def extract_financial_data(self, file_path: str) -> List[Dict[str, Any]]:
        """Extract financial data using AI"""
        self.logger.info(f"Starting AI extraction for {file_path}")
        
        # Extract text from file
        document_text = self.extract_text_from_file(file_path)
        
        # Create the extraction prompt
        prompt = self._create_extraction_prompt(document_text)
        
        try:
            # Call OpenAI API
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": "You are an expert financial data analyst specializing in extracting structured data from financial documents."},
                    {"role": "user", "content": prompt}
                ],
                temperature=CONFIG["openai"]["temperature"]
            )
            
            # Parse the response
            extracted_data = self._parse_ai_response(response.choices[0].message.content)
            
            self.logger.info(f"Successfully extracted {len(extracted_data)} records using AI")
            return extracted_data
            
        except Exception as e:
            self.logger.error(f"Error in AI extraction: {str(e)}")
            raise
    
    def _create_extraction_prompt(self, document_text: str) -> str:
        """Create a detailed prompt for the AI to extract financial data"""
        prompt = f"""
Please analyze the following financial document and extract all investment records. 
Return the data as a JSON array where each object represents one investment record.

For each record, extract the following fields (if available):
- as_of_date: The date when the data was recorded (format as MM/DD/YYYY)
- original_security_name: The name of the investment/security
- investment_in_original: Original investment amount (as a number, no currency symbols)
- investment_in: Current investment value (as a number, no currency symbols)
- investment_in_prior: Previous period investment value (as a number, no currency symbols)
- currency: Three-letter currency code (e.g., USD, EUR)
- sector: Investment sector (e.g., Technology, Government)
- risk_rating: Risk level (e.g., Low, Moderate, High)
- maturity_date: When the investment matures (format as MM/DD/YYYY)
- yield_percentage: Yield or return percentage (as a number without % sign)
- isin: ISIN code if available
- cusip: CUSIP code if available
- asset_class: Type of asset (e.g., Bond, Equity)
- country: Country of origin
- region: Geographic region

IMPORTANT RULES:
1. Only include fields that are actually present in the document
2. Convert all dates to MM/DD/YYYY format
3. Extract numeric values without currency symbols or commas
4. If a field is not found, omit it from the record
5. Look for variations in field names (e.g., "Market Value" might mean "investment_in")
6. Return valid JSON only, no additional text

Document to analyze:

{document_text}
"""
        return prompt
    
    def _parse_ai_response(self, response_text: str) -> List[Dict[str, Any]]:
        """Parse the AI response into structured data"""
        try:
            # Clean the response text to extract JSON
            json_start = response_text.find('[')
            json_end = response_text.rfind(']') + 1
            json_text = response_text[json_start:json_end]
            
            # Parse JSON
            extracted_data = json.loads(json_text)
            
            # Validate and clean the data
            cleaned_data = []
            for record in extracted_data:
                cleaned_record = self._clean_record(record)
                if cleaned_record:
                    cleaned_data.append(cleaned_record)
            
            return cleaned_data
            
        except json.JSONDecodeError as e:
            self.logger.error(f"Error parsing AI response as JSON: {str(e)}")
            self.logger.debug(f"Raw response: {response_text}")
            # Attempt to extract JSON using regex as fallback
            return self._extract_json_fallback(response_text)
        except Exception as e:
            self.logger.error(f"Error processing AI response: {str(e)}")
            raise
    
    def _extract_json_fallback(self, text: str) -> List[Dict[str, Any]]:
        """Fallback method to extract JSON from response"""
        try:
            # Try to find JSON objects in the text
            json_objects = re.findall(r'\{[^{}]*\}', text)
            results = []
            for obj_str in json_objects:
                try:
                    obj = json.loads(obj_str)
                    results.append(obj)
                except:
                    continue
            return results if results else []
        except:
            return []
    
    def _clean_record(self, record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Clean and validate a single record"""
        cleaned = {}
        
        # Map of field conversions
        field_mapping = {
            'security_name': 'original_security_name',
            'instrument_name': 'original_security_name',
            'asset_name': 'original_security_name',
            'original_investment': 'investment_in_original',
            'initial_investment': 'investment_in_original',
            'market_value': 'investment_in',
            'current_value': 'investment_in',
            'previous_value': 'investment_in_prior',
            'prior_value': 'investment_in_prior',
            'yield': 'yield_percentage',
            'yield_rate': 'yield_percentage',
            'currency_code': 'currency',
            'risk_level': 'risk_rating'
        }
        
        # Clean and map fields
        for key, value in record.items():
            # Normalize key name
            clean_key = field_mapping.get(key.lower(), key.lower())
            
            # Clean and convert values
            if value is not None and value != "":
                if clean_key in ['investment_in_original', 'investment_in', 'investment_in_prior', 'yield_percentage']:
                    # Convert to float, removing any non-numeric characters except decimal point
                    try:
                        cleaned_value = re.sub(r'[^\d.-]', '', str(value))
                        cleaned[clean_key] = float(cleaned_value) if cleaned_value else None
                    except:
                        cleaned[clean_key] = None
                elif clean_key in ['as_of_date', 'maturity_date']:
                    # Format dates
                    try:
                        if isinstance(value, str) and value.lower() not in ['n/a', 'na', 'none']:
                            date_obj = date_parser.parse(value, fuzzy=True)
                            cleaned[clean_key] = date_obj.strftime('%m/%d/%Y')
                        else:
                            cleaned[clean_key] = value
                    except:
                        cleaned[clean_key] = value
                else:
                    cleaned[clean_key] = str(value).strip()
        
        # Only return records with at least one meaningful field
        return cleaned if len(cleaned) > 0 else None

In [21]:
class AIDataProcessor:
    """Process and validate AI-extracted data"""
    
    def __init__(self):
        self.logger = logging.getLogger(f'{__name__}.AIDataProcessor')
        self.mandatory_fields = [
            'as_of_date', 'original_security_name', 'investment_in_original',
            'investment_in', 'investment_in_prior', 'currency'
        ]
    
    def process_data(self, raw_data: List[Dict[str, Any]]) -> List[FinancialRecord]:
        """Convert raw data to FinancialRecord objects"""
        records = []
        
        for item in raw_data:
            try:
                # Convert to FinancialRecord
                record = FinancialRecord(**item)
                records.append(record)
            except Exception as e:
                self.logger.warning(f"Error creating record from {item}: {str(e)}")
                continue
        
        self.logger.info(f"Processed {len(records)} records")
        return records
    
    def calculate_statistics(self, records: List[FinancialRecord]) -> Dict[str, Any]:
        """Calculate extraction statistics"""
        total_records = len(records)
        if total_records == 0:
            return {
                'total_records': 0,
                'extraction_accuracy': 0,
                'mandatory_field_completeness': {},
                'field_presence': {},
                'missing_fields': [],
                'inconsistent_data': []
            }
        
        # Count field presence
        field_counts = {}
        for record in records:
            record_dict = asdict(record)
            for field, value in record_dict.items():
                if value is not None:
                    field_counts[field] = field_counts.get(field, 0) + 1
        
        # Calculate mandatory field completeness
        mandatory_completeness = {}
        missing_fields = []
        for field in self.mandatory_fields:
            count = field_counts.get(field, 0)
            percentage = (count / total_records) * 100
            mandatory_completeness[field] = {
                'count': count,
                'percentage': percentage
            }
            if count < total_records:
                missing_fields.append(f"{field} ({total_records - count} missing)")
        
        # Check for inconsistencies
        inconsistencies = []
        currencies = set()
        date_formats = set()
        
        for record in records:
            if record.currency:
                currencies.add(record.currency)
            if record.as_of_date:
                date_formats.add(self._identify_date_format(record.as_of_date))
        
        if len(currencies) > 1:
            inconsistencies.append(f"Multiple currencies: {', '.join(currencies)}")
        if len(date_formats) > 1:
            inconsistencies.append(f"Multiple date formats: {', '.join(date_formats)}")
        
        # Calculate overall accuracy
        total_mandatory_fields = len(self.mandatory_fields) * total_records
        filled_mandatory_fields = sum(counts['count'] for counts in mandatory_completeness.values())
        accuracy = (filled_mandatory_fields / total_mandatory_fields) * 100 if total_mandatory_fields > 0 else 0
        
        return {
            'total_records': total_records,
            'extraction_accuracy': accuracy,
            'mandatory_field_completeness': mandatory_completeness,
            'field_presence': field_counts,
            'missing_fields': missing_fields,
            'inconsistent_data': inconsistencies
        }
    
    def _identify_date_format(self, date_str: str) -> str:
        """Identify the format of a date string"""
        if re.match(r'\d{1,2}/\d{1,2}/\d{4}', date_str):
            return "MM/DD/YYYY"
        elif re.match(r'\d{4}-\d{1,2}-\d{1,2}', date_str):
            return "YYYY-MM-DD"
        elif re.match(r'[A-Za-z]+ \d{1,2},?\s+\d{4}', date_str):
            return "Month DD, YYYY"
        return "Unknown format"

In [23]:
class AIDataStorage:
    """Store processed data in database and Excel"""
    
    def __init__(self, db_config: Dict[str, Any], excel_file: str):
        self.db_config = db_config
        self.excel_file = excel_file
        self.logger = logging.getLogger(f'{__name__}.AIDataStorage')
    
    def store_in_database(self, records: List[FinancialRecord]) -> bool:
        """Store records in PostgreSQL database"""
        try:
            # Convert records to DataFrame
            df = pd.DataFrame([asdict(record) for record in records])
            
            # Create database connection
            connection_string = (f"postgresql://{self.db_config['user']}:{self.db_config['password']}"
                               f"@{self.db_config['host']}:{self.db_config['port']}/{self.db_config['database']}")
            engine = create_engine(connection_string)
            
            # Drop existing table/view
            with engine.connect() as connection:
                connection.execute(text("DROP VIEW IF EXISTS ai_financial_data_stats CASCADE;"))
                connection.execute(text("DROP TABLE IF EXISTS ai_financial_data CASCADE;"))
                connection.commit()
            
            # Store data
            df.to_sql('ai_financial_data', engine, if_exists='replace', index=False)
            
            # Create statistics view
            self._create_stats_view(engine)
            
            self.logger.info(f"Successfully stored {len(records)} records in database")
            return True
            
        except Exception as e:
            self.logger.error(f"Database storage error: {str(e)}")
            return False
    
    def _create_stats_view(self, engine):
        """Create database view with statistics"""
        view_sql = """
        CREATE OR REPLACE VIEW ai_financial_data_stats AS
        SELECT
            COUNT(*) AS total_records,
            SUM(CASE WHEN as_of_date IS NOT NULL THEN 1 ELSE 0 END) AS as_of_date_count,
            SUM(CASE WHEN original_security_name IS NOT NULL THEN 1 ELSE 0 END) AS original_security_name_count,
            SUM(CASE WHEN investment_in_original IS NOT NULL THEN 1 ELSE 0 END) AS investment_in_original_count,
            SUM(CASE WHEN investment_in IS NOT NULL THEN 1 ELSE 0 END) AS investment_in_count,
            SUM(CASE WHEN investment_in_prior IS NOT NULL THEN 1 ELSE 0 END) AS investment_in_prior_count,
            SUM(CASE WHEN currency IS NOT NULL THEN 1 ELSE 0 END) AS currency_count,
            COUNT(DISTINCT currency) AS distinct_currencies
        FROM ai_financial_data;
        """
        
        with engine.connect() as connection:
            connection.execute(text(view_sql))
            connection.commit()
    
    def store_in_excel(self, records: List[FinancialRecord], stats: Dict[str, Any]) -> bool:
        """Store records and statistics in Excel file"""
        try:
            # Convert records to DataFrame
            df = pd.DataFrame([asdict(record) for record in records])
            
            # Create Excel writer
            with pd.ExcelWriter(self.excel_file, engine='openpyxl') as writer:
                # Write data sheet
                df.to_excel(writer, sheet_name='Extracted Data', index=False)
                
                # Create statistics DataFrame
                stats_data = {
                    'Metric': [
                        'Total Records',
                        'Overall Extraction Accuracy (%)',
                        'Missing Fields',
                        'Inconsistent Data'
                    ],
                    'Value': [
                        stats['total_records'],
                        f"{stats['extraction_accuracy']:.2f}%",
                        ', '.join(stats['missing_fields']) if stats['missing_fields'] else 'None',
                        ', '.join(stats['inconsistent_data']) if stats['inconsistent_data'] else 'None'
                    ]
                }
                
                # Add mandatory field completeness
                for field, completeness in stats['mandatory_field_completeness'].items():
                    stats_data['Metric'].append(f'{field} completeness')
                    stats_data['Value'].append(f"{completeness['count']}/{stats['total_records']} ({completeness['percentage']:.1f}%)")
                
                stats_df = pd.DataFrame(stats_data)
                stats_df.to_excel(writer, sheet_name='Statistics', index=False)
                
                # Apply formatting
                self._format_excel(writer)
            
            self.logger.info(f"Successfully stored data in Excel file: {self.excel_file}")
            return True
            
        except Exception as e:
            self.logger.error(f"Excel storage error: {str(e)}")
            return False
    
    def _format_excel(self, writer):
        """Apply formatting to Excel file"""
        workbook = writer.book
        
        # Format data sheet
        worksheet = workbook['Extracted Data']
        for cell in worksheet[1]:
            cell.font = Font(bold=True)
            cell.fill = PatternFill(start_color="DDDDDD", end_color="DDDDDD", fill_type="solid")
            cell.alignment = Alignment(horizontal='center')
        
        # Format statistics sheet
        worksheet = workbook['Statistics']
        for cell in worksheet[1]:
            cell.font = Font(bold=True)
            cell.fill = PatternFill(start_color="DDDDDD", end_color="DDDDDD", fill_type="solid")
            cell.alignment = Alignment(horizontal='center')
        
        # Adjust column widths
        for sheet_name in ['Extracted Data', 'Statistics']:
            worksheet = workbook[sheet_name]
            for column in worksheet.columns:
                max_length = 0
                column_letter = column[0].column_letter
                for cell in column:
                    try:
                        if len(str(cell.value)) > max_length:
                            max_length = len(str(cell.value))
                    except:
                        pass
                worksheet.column_dimensions[column_letter].width = max(max_length + 2, 12)

In [25]:
def create_sample_financial_document(doc_type: str = "txt") -> str:
    """Create sample financial documents for testing"""
    if doc_type == "txt":
        content = """Financial Investment Report
Date: March 31, 2024

PORTFOLIO SUMMARY

Investment 1:
Security Name: Apple Inc. Common Stock
Original Investment: $50,000.00
Current Market Value: $57,500.00
Prior Quarter Value: $52,000.00
Currency: USD
Sector: Technology
Risk Rating: Moderate
Yield: 0.50%

Investment 2:
Security Name: US Treasury Bond 2030
Original Investment: $100,000.00
Current Market Value: $98,500.00
Prior Quarter Value: $99,200.00
Currency: USD
Sector: Government
Risk Rating: Low
Maturity Date: 12/31/2030
Yield: 4.25%

Investment 3:
Security Name: Emerging Markets ETF
Original Investment: $25,000.00
Current Market Value: $27,800.00
Prior Quarter Value: $26,100.00
Currency: USD
Sector: International
Risk Rating: High
Yield: 2.85%
"""
        filename = "sample_financial_report.txt"
        
    elif doc_type == "csv":
        content = """Security Name,Original Investment,Current Value,Prior Value,Currency,Sector,Risk Rating,Yield %
Apple Inc. Common Stock,50000,57500,52000,USD,Technology,Moderate,0.50
US Treasury Bond 2030,100000,98500,99200,USD,Government,Low,4.25
Emerging Markets ETF,25000,27800,26100,USD,International,High,2.85
"""
        filename = "sample_financial_data.csv"
        
    elif doc_type == "json":
        data = {
            "report_date": "2024-03-31",
            "investments": [
                {
                    "security_name": "Apple Inc. Common Stock",
                    "original_investment": 50000,
                    "current_value": 57500,
                    "prior_value": 52000,
                    "currency": "USD",
                    "sector": "Technology",
                    "risk_rating": "Moderate",
                    "yield_percentage": 0.50
                },
                {
                    "security_name": "US Treasury Bond 2030",
                    "original_investment": 100000,
                    "current_value": 98500,
                    "prior_value": 99200,
                    "currency": "USD",
                    "sector": "Government",
                    "risk_rating": "Low",
                    "maturity_date": "2030-12-31",
                    "yield_percentage": 4.25
                },
                {
                    "security_name": "Emerging Markets ETF",
                    "original_investment": 25000,
                    "current_value": 27800,
                    "prior_value": 26100,
                    "currency": "USD",
                    "sector": "International",
                    "risk_rating": "High",
                    "yield_percentage": 2.85
                }
            ]
        }
        content = json.dumps(data, indent=2)
        filename = "sample_financial_data.json"
    
    with open(filename, 'w') as f:
        f.write(content)
    
    return filename

In [27]:
def main_ai_extraction(file_path: str, openai_api_key: str) -> bool:
    """Main function to run AI-powered extraction pipeline"""
    logger.info(f"Starting AI-powered extraction for {file_path}")
    
    try:
        # Initialize AI extractor
        extractor = AIDocumentExtractor(openai_api_key, CONFIG["openai"]["model"])
        
        # Extract data using AI
        raw_data = extractor.extract_financial_data(file_path)
        
        # Process data
        processor = AIDataProcessor()
        records = processor.process_data(raw_data)
        stats = processor.calculate_statistics(records)
        
        # Store data
        storage = AIDataStorage(CONFIG["database"], CONFIG["output"]["excel_file"])
        
        # Store in database
        db_success = storage.store_in_database(records)
        
        # Store in Excel
        excel_success = storage.store_in_excel(records, stats)
        
        # Print results
        logger.info("\n=== AI EXTRACTION RESULTS ===")
        logger.info(f"Total Records Extracted: {stats['total_records']}")
        logger.info(f"Extraction Accuracy: {stats['extraction_accuracy']:.2f}%")
        logger.info(f"Database Storage: {'Success' if db_success else 'Failed'}")
        logger.info(f"Excel Storage: {'Success' if excel_success else 'Failed'}")
        
        if stats['missing_fields']:
            logger.info(f"Missing Fields: {', '.join(stats['missing_fields'])}")
        
        if stats['inconsistent_data']:
            logger.info(f"Inconsistencies: {', '.join(stats['inconsistent_data'])}")
        
        return db_success and excel_success
        
    except Exception as e:
        logger.error(f"Error in AI extraction pipeline: {str(e)}")
        return False

In [33]:
# Example usage
if __name__ == "__main__":
    # Set your OpenAI API key here
    OPENAI_API_KEY = CONFIG["openai"]["api_key"]  # Use the key from CONFIG
    
    # Create and test with sample documents
    for doc_type in ["txt", "csv", "json"]:
        logger.info(f"\n{'='*50}")
        logger.info(f"Testing AI extraction with {doc_type.upper()} document")
        logger.info(f"{'='*50}")
        
        # Create sample document
        sample_file = create_sample_financial_document(doc_type)
        logger.info(f"Created sample {doc_type} file: {sample_file}")
        
        # Run AI extraction
        success = main_ai_extraction(sample_file, OPENAI_API_KEY)
        
        if success:
            logger.info(f"✅ AI extraction completed successfully for {doc_type}")
        else:
            logger.error(f"❌ AI extraction failed for {doc_type}")

2025-05-13 15:37:44,727 - ai_financial_extractor - INFO - 
2025-05-13 15:37:44,727 - ai_financial_extractor - INFO - Testing AI extraction with TXT document
2025-05-13 15:37:44,730 - ai_financial_extractor - INFO - Created sample txt file: sample_financial_report.txt
2025-05-13 15:37:44,730 - ai_financial_extractor - INFO - Starting AI-powered extraction for sample_financial_report.txt
2025-05-13 15:37:44,934 - __main__.AIDocumentExtractor - INFO - Starting AI extraction for sample_financial_report.txt
2025-05-13 15:37:48,080 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-05-13 15:37:48,097 - __main__.AIDocumentExtractor - INFO - Successfully extracted 3 records using AI
2025-05-13 15:37:48,098 - __main__.AIDataProcessor - INFO - Processed 3 records
2025-05-13 15:37:48,149 - __main__.AIDataStorage - INFO - Successfully stored 3 records in database
2025-05-13 15:37:48,161 - __main__.AIDataStorage - INFO - Successfully stored data in