In [1]:
import os
import json
import re
import pandas as pd
import pdfplumber
from typing import List, Dict, Any, Optional

print("Libraries imported successfully.")

Libraries imported successfully.


# 1. 实现金融领域的专有名词标准化系统
Implement a Financial Domain Proper Noun Standardization System.

In [2]:
class FinancialTermStandardizer:
    def __init__(self, term_mapping: Dict[str, str] = None):
        self.term_mapping = term_mapping or {}
        # Sort keys by length (descending) to match longest terms first
        self.sorted_keys = sorted(self.term_mapping.keys(), key=len, reverse=True)

    def add_term(self, alias: str, canonical: str):
        self.term_mapping[alias] = canonical
        self.sorted_keys = sorted(self.term_mapping.keys(), key=len, reverse=True)

    def standardize(self, text: str) -> str:
        # A simple replacement strategy. 
        # Using regex with word boundaries to prevent partial matches (e.g. 'POS' inside 'Deposit')
        result = text
        for alias in self.sorted_keys:
            # Skip single characters to avoid excessive false positives (like 'a', 'i')
            if len(alias) < 2:
                continue
                
            # Use word boundaries \b and case-insensitive matching
            pattern = r'\b' + re.escape(alias) + r'\b'
            if re.search(pattern, result, re.IGNORECASE):
                result = re.sub(pattern, self.term_mapping[alias], result, flags=re.IGNORECASE)
        return result

# Load mappings from CSV
csv_path = 'data/万条金融标准术语.csv'
mapping = {}

if os.path.exists(csv_path):
    try:
        # Based on file inspection, columns are 'A' (Term) and 'FINTERM' (Type)
        df_terms = pd.read_csv(csv_path)
        if 'A' in df_terms.columns:
            terms = df_terms['A'].dropna().unique().tolist()
            for term in terms:
                if isinstance(term, str):
                    # Strategy: Map lowercase variant to standard Term
                    mapping[term.lower()] = term
            print(f"Loaded {len(mapping)} terms from {csv_path}")
        else:
            print("Column 'A' not found in CSV. Checking first column.")
            terms = df_terms.iloc[:, 0].dropna().unique().tolist()
            for term in terms:
                 if isinstance(term, str):
                    mapping[term.lower()] = term
    except Exception as e:
        print(f"Error loading CSV: {e}")
else:
    print(f"Warning: {csv_path} not found. Using empty mapping.")

# Add some manual overrides if needed
manual_mapping = {
    '工行': 'Industrial and Commercial Bank of China',
    '蚂蚁金服': 'Ant Group',
    'ICBC': 'Industrial and Commercial Bank of China',
    'ALIPAY': 'Alipay'
}
for k, v in manual_mapping.items():
    mapping[k.lower()] = v
    mapping[k] = v # Also add original case for manual ones

standardizer = FinancialTermStandardizer(mapping)

# Test
test_sentences = [
    "I went to icbc to deposit money.",
    "Using alipay is convenient.",
    "工行 is a large bank.",
    "The a priori probability is high.", # Should capitalize 'A Priori Probability'
    "we need a round financing." # Should capitalize 'A Round Financing'
]

print("Standardization Results:")
for sent in test_sentences:
    print(f"Original: {sent}")
    print(f"Standardized: {standardizer.standardize(sent)}")
    print("-" * 20)

Loaded 15724 terms from data/万条金融标准术语.csv
Standardization Results:
Original: I went to icbc to deposit money.
Standardized: I went to Industrial and Commercial Bank of China to Deposit Money.
--------------------
Original: Using alipay is convenient.
Standardized: Using Alipay is convenient.
--------------------
Original: 工行 is a large bank.
Standardized: Industrial and Commercial Bank of China is a large Bank.
--------------------
Original: The a priori probability is high.
Standardized: The A Priori Probability is high.
--------------------
Original: we need a round financing.
Standardized: we need A Round Financing.
--------------------


# 2. 自学习并实践不同的数据导入方法
Self-study and practice different data import methods.

In [3]:
# Ensure data exists
if not os.path.exists('data'):
    os.makedirs('data')
    # (Assuming data generation logic is handled externally or here if needed)
    # For this notebook execution, we assume data/sample.txt, csv, pdf exist.

def read_text_file(path):
    with open(path, 'r', encoding='utf-8') as f:
        return f.read()

def read_csv_file(path):
    return pd.read_csv(path)

def read_pdf_file(path):
    text_content = []
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            text_content.append(page.extract_text())
    return "\n".join(text_content)

print("--- TXT Import ---")
try:
    print(read_text_file('data/sample.txt')[:100])
except Exception as e: print(e)

print("\n--- CSV Import ---")
try:
    print(read_csv_file('data/sample.csv').head())
except Exception as e: print(e)

print("\n--- PDF Import ---")
try:
    print(read_pdf_file('data/sample.pdf')[:100])
except Exception as e: print(e)

--- TXT Import ---
This is a sample text file for the RAG homework.
It contains some financial terms like ICBC and ALIP

--- CSV Import ---
   id                content    source
0   1  Financial Report 2023  Internal
1   2     Market Analysis Q1  External

--- PDF Import ---
Financial Report Overview
This document contains financial data.
The table below shows the revenue:



# 3. 重构 Load File, Chunk File, Parse File
Refactor Load File, Chunk File, and Parse File.

In [4]:
class DataLoader:
    def __init__(self):
        pass

    def load(self, file_path: str, file_type: str = None) -> str:
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")
        
        if file_type is None:
            file_type = file_path.split('.')[-1].lower()
        
        if file_type == 'txt':
            return self._load_txt(file_path)
        elif file_type == 'csv':
            return self._load_csv(file_path)
        elif file_type == 'pdf':
            return self._load_pdf(file_path)
        else:
            raise ValueError(f"Unsupported file type: {file_type}")

    def _load_txt(self, path):
        with open(path, 'r', encoding='utf-8') as f:
            return f.read()

    def _load_csv(self, path):
        # Convert CSV to string representation for RAG
        df = pd.read_csv(path)
        return df.to_json(orient='records', indent=2)

    def _load_pdf(self, path):
        # Use the parser for PDF
        return PDFParser.parse(path)

loader = DataLoader()
print("DataLoader initialized.")

DataLoader initialized.


In [5]:
class PDFParser:
    @staticmethod
    def parse(file_path: str) -> str:
        """
        Parses PDF, extracting text and tables.
        Returns a JSON string containing text and metadata.
        """
        result = {
            "file_path": file_path,
            "pages": []
        }
        
        with pdfplumber.open(file_path) as pdf:
            for i, page in enumerate(pdf.pages):
                page_data = {
                    "page_number": i + 1,
                    "text": page.extract_text() or "",
                    "tables": page.extract_tables(),
                    "images_metadata": page.images  # Metadata about images (pos, size)
                }
                result["pages"].append(page_data)
                
        return json.dumps(result, ensure_ascii=False, indent=2)

print("PDFParser defined.")

PDFParser defined.


In [6]:
class TextChunker:
    def __init__(self, chunk_size=100, overlap=20):
        self.chunk_size = chunk_size
        self.overlap = overlap

    def chunk(self, text: str, method='fixed') -> List[Dict[str, Any]]:
        if method == 'fixed':
            return self._chunk_fixed(text)
        elif method == 'recursive':
            return self._chunk_recursive(text)
        else:
            raise ValueError("Unknown chunking method")

    def _chunk_fixed(self, text: str) -> List[Dict[str, Any]]:
        chunks = []
        start = 0
        text_len = len(text)
        
        while start < text_len:
            end = min(start + self.chunk_size, text_len)
            chunk_text = text[start:end]
            chunks.append({
                "content": chunk_text,
                "metadata": {"start": start, "end": end, "method": "fixed"}
            })
            start += (self.chunk_size - self.overlap)
        return chunks

    def _chunk_recursive(self, text: str) -> List[Dict[str, Any]]:
        # Simple simulation of recursive chunking by splitting on newlines/sentences
        # In a real scenario, use langchain's RecursiveCharacterTextSplitter
        separators = ["\n\n", "\n", ". ", " ", ""]
        # This is a simplified version just splitting by newline for demonstration
        parts = text.split('\n')
        chunks = []
        current_chunk = ""
        
        for part in parts:
            if len(current_chunk) + len(part) < self.chunk_size:
                current_chunk += part + "\n"
            else:
                if current_chunk:
                    chunks.append({
                        "content": current_chunk.strip(),
                        "metadata": {"method": "recursive"}
                    })
                current_chunk = part + "\n"
        
        if current_chunk:
            chunks.append({
                "content": current_chunk.strip(),
                "metadata": {"method": "recursive"}
            })
            
        return chunks

chunker = TextChunker(chunk_size=50, overlap=10)
print("TextChunker defined.")

TextChunker defined.


In [7]:
print("--- Integrated Pipeline Test ---")

# 1. Load PDF (uses Parser internally)
try:
    loaded_data = loader.load('data/sample.pdf', file_type='pdf')
    print("Data Loaded (First 200 chars):", loaded_data[:200])
    
    # Parse the JSON string back to dict to access text
    data_dict = json.loads(loaded_data)
    full_text = ""
    for page in data_dict['pages']:
        full_text += page['text'] + "\n"
        if page['tables']:
            print(f"Found {len(page['tables'])} tables on page {page['page_number']}")
            print("Table 1:", page['tables'][0])

    # 2. Standardize Text
    # Add some terms found in the text if any, or just demonstrate on sample text
    std_text = standardizer.standardize(full_text)
    print("\nStandardized Text (Snippet):", std_text[:100])

    # 3. Chunk Text
    chunks = chunker.chunk(std_text, method='fixed')
    print(f"\nGenerated {len(chunks)} chunks using fixed method.")
    print("Chunk 1:", chunks[0])
    
    # Save chunks to JSON
    with open('data/output_chunks.json', 'w', encoding='utf-8') as f:
        json.dump(chunks, f, ensure_ascii=False, indent=2)
    print("Chunks saved to data/output_chunks.json")

except Exception as e:
    print("Error in integrated test:", e)
    import traceback
    traceback.print_exc()

--- Integrated Pipeline Test ---
Data Loaded (First 200 chars): {
  "file_path": "data/sample.pdf",
  "pages": [
    {
      "page_number": 1,
      "text": "Financial Report Overview\nThis document contains financial data.\nThe table below shows the revenue:\nYea

Standardized Text (Snippet): Financial Report Overview
This document contains financial data.
The table below shows the Revenue:


Generated 4 chunks using fixed method.
Chunk 1: {'content': 'Financial Report Overview\nThis document contains f', 'metadata': {'start': 0, 'end': 50, 'method': 'fixed'}}
Chunks saved to data/output_chunks.json
