In [3]:
import stanza
import os
import csv
import pandas as pd

In [4]:
# Download and initialize Stanza pipeline for Persian
print("Initializing Persian NLP pipeline...")
stanza.download('fa', verbose=False)
nlp = stanza.Pipeline(lang='fa', processors='tokenize,pos', verbose=False)

# POS tag translations to Persian
pos_translations = {
    'NOUN': 'اسم',
    'VERB': 'فعل', 
    'ADJ': 'صفت',
    'ADV': 'قید',
    'PRON': 'ضمیر',
    'DET': 'تعیینکننده',
    'ADP': 'حرف اضافه',
    'NUM': 'عدد',
    'CONJ': 'حرف ربط',
    'CCONJ': 'حرف ربط هماهنگکننده', 
    'SCONJ': 'حرف ربط تابعساز',
    'PART': 'ذره',
    'INTJ': 'حرف تعجب',
    'PUNCT': 'نشانه نگارشی',
    'SYM': 'نماد',
    'X': 'سایر'
}


Initializing Persian NLP pipeline...


In [6]:
def read_file(file_path):
    """Read text file with UTF-8 encoding"""
    if not os.path.exists(file_path):
        print(f"File {file_path} not found!")
        return None
    
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read().strip()
            print(f"Successfully read {len(content)} characters from {file_path}")
            return content
    except Exception as e:
        print(f"Error reading file {file_path}: {e}")
        return None


In [7]:
def read_banking_terms_from_csv(csv_file):
    """Read banking terms from a CSV file"""
    banking_terms = []
    
    if not os.path.exists(csv_file):
        print(f"Banking terms file {csv_file} not found!")
        print("Please create banking_terms.csv with your banking terms.")
        return []
    
    try:
        with open(csv_file, 'r', encoding='utf-8') as f:
            reader = csv.reader(f)
            for row_num, row in enumerate(reader, 1):
                for term in row:
                    term = term.strip()
                    if term:  # Only add non-empty terms
                        banking_terms.append(term)
        
        # Remove duplicates while preserving order
        banking_terms = list(dict.fromkeys(banking_terms))
        print(f"Successfully loaded {len(banking_terms)} unique banking terms from {csv_file}")
        return banking_terms
        
    except Exception as e:
        print(f"Error reading banking terms from {csv_file}: {e}")
        return []


In [8]:
def split_text_by_sentences(text, max_sentences_per_chunk=30):
    """Split text into smaller chunks based on sentence count"""
    sentences = []
    current_sentence = ""
    
    for char in text:
        current_sentence += char
        if char in ['۔', '.', '!', '?', '؟', ':', '؛']:
            if current_sentence.strip():
                sentences.append(current_sentence.strip())
            current_sentence = ""
    
    if current_sentence.strip():
        sentences.append(current_sentence.strip())
    
    chunks = []
    for i in range(0, len(sentences), max_sentences_per_chunk):
        chunk = ' '.join(sentences[i:i + max_sentences_per_chunk])
        if chunk.strip():
            chunks.append(chunk)
    
    return chunks


In [9]:
def process_text_chunk(text_chunk, chunk_num=1, total_chunks=1):
    """Process a single chunk of text"""
    try:
        print(f"Processing chunk {chunk_num}/{total_chunks} ({len(text_chunk)} characters)")
        doc = nlp(text_chunk)
        results = []
        
        for sentence in doc.sentences:
            for word in sentence.words:
                if word.text.strip():
                    persian_pos = pos_translations.get(word.upos, word.upos)    #check
                    results.append({
                        'word': word.text,
                        'pos': word.upos,
                        'persian_pos': persian_pos
                    })
        
        print(f"Chunk {chunk_num} processed: {len(results)} words extracted")
        return results
        
    except Exception as e:
        print(f"Error processing chunk {chunk_num}: {e}")
        return []


In [10]:
def process_text_safely(text):
    """Process text safely by splitting into manageable chunks"""
    if not text or not text.strip():
        print("Empty text provided")
        return []
    
    print(f"Starting text processing... ({len(text)} characters)")
    
    if len(text) < 3000:
        return process_text_chunk(text)
    
    print("Large text detected, splitting into chunks...")
    text_chunks = split_text_by_sentences(text, max_sentences_per_chunk=25)
    print(f"Text split into {len(text_chunks)} chunks")
    
    all_results = []
    successful_chunks = 0
    
    for i, chunk in enumerate(text_chunks):
        chunk_results = process_text_chunk(chunk, i+1, len(text_chunks))
        if chunk_results:
            all_results.extend(chunk_results)
            successful_chunks += 1
        else:
            print(f"Warning: Chunk {i+1} failed to process")
    
    print(f"Processing completed: {successful_chunks}/{len(text_chunks)} chunks successful")
    print(f"Total words extracted: {len(all_results)}")
    return all_results


In [11]:
def extract_banking_entities(results, banking_terms):
    """Extract banking entities using terms from CSV file"""
    if not banking_terms:
        print("No banking terms provided for entity extraction")
        return []
    
    entities = []
    seen_entities = set()
    
    print(f"Searching for banking entities using {len(banking_terms)} terms...")
    
    for i in range(len(results) - 1):
        current_word = results[i]
        next_word = results[i + 1]
        
        if current_word['word'] in banking_terms and next_word['pos'] == 'NOUN':
            entity_text = f"{current_word['word']} {next_word['word']}"
            if entity_text not in seen_entities:
                entity = {
                    'entity': entity_text,
                    'banking_term': current_word['word']
                }
                entities.append(entity)
                seen_entities.add(entity_text)
        
        elif current_word['pos'] == 'NOUN' and next_word['word'] in banking_terms:
            entity_text = f"{current_word['word']} {next_word['word']}"
            if entity_text not in seen_entities:
                entity = {
                    'entity': entity_text,
                    'banking_term': next_word['word']
                }
                entities.append(entity)
                seen_entities.add(entity_text)
    
    print(f"Found {len(entities)} unique banking entities")
    return entities


In [12]:
def save_results_csv(results, output_file):
    """Save POS tagging results to CSV"""
    try:
        with open(output_file, 'w', encoding='utf-8', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(['کلمه', 'تگ', 'نقش'])
            for item in results:
                writer.writerow([item['word'], item['pos'], item['persian_pos']])
        print(f"POS results saved to {output_file}")
        return True
    except Exception as e:
        print(f"Error saving POS results: {e}")
        return False


In [13]:
def save_banking_entities_csv(entities, output_file):
    try:
        with open(output_file, 'w', encoding='utf-8', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(['واژه بانکی', 'ترکیب شناسایی شده'])
            for entity in entities:
                writer.writerow([entity['banking_term'], entity['entity']])
        print(f"Banking entities saved to {output_file}")
        return True
    except Exception as e:
        print(f"Error saving banking entities CSV: {e}")
        return False

def save_banking_entities_excel(entities, output_file):
    try:
        data = {
            'واژه بانکی': [entity['banking_term'] for entity in entities],
            'ترکیب شناسایی شده': [entity['entity'] for entity in entities]
        }
        df = pd.DataFrame(data)
        df.to_excel(output_file, index=False, engine='openpyxl')
        print(f"Banking entities saved to {output_file}")
        return True
    except ImportError:
        print("pandas or openpyxl not installed. Install with: pip install pandas openpyxl")
        return False
    except Exception as e:
        print(f"Error saving banking entities Excel: {e}")
        return False


In [14]:
def print_sample_results(results, sample_size=10):
    """Skip printing POS results - only save to files"""
    pass

def print_banking_entities(entities, max_display=20):
    """Skip printing banking entities - only save to files"""
    pass


In [15]:
def main():
    print("=== Persian Banking Entity Extractor ===\n")
    
    banking_terms_file = "banking_terms.csv"
    input_file = "test.txt"
    
    print("Step 1: Loading banking terms...")
    banking_terms = read_banking_terms_from_csv(banking_terms_file)
    if not banking_terms:
        print("❌ No banking terms loaded. Please create banking_terms.csv file.")
        return
    
    print("\nStep 2: Reading input text...")
    text = read_file(input_file)
    if not text:
        print("❌ Failed to read input text. Please check test.txt file.")
        return
    
    print("\nStep 3: Processing text for POS tagging...")
    results = process_text_safely(text)
    if not results:
        print("❌ Text processing failed.")
        return
    
    print("\nStep 4: Saving POS tagging results...")
    save_results_csv(results, "pos_output.csv")
    
    print("\nStep 5: Extracting banking entities...")
    banking_entities = extract_banking_entities(results, banking_terms)
    
    if not banking_entities:
        print("❌ No banking entities found in the text.")
        return
    
    print("\nStep 6: Saving banking entities...")
    save_banking_entities_csv(banking_entities, "banking_entities.csv")
    save_banking_entities_excel(banking_entities, "banking_entities.xlsx")
    
    print(f"\n=== Summary ===")
    print(f"✅ Text processed: {len(text)} characters")
    print(f"✅ Words analyzed: {len(results)}")
    print(f"✅ Banking terms loaded: {len(banking_terms)}")
    print(f"✅ Unique banking entities found: {len(banking_entities)}")
    print(f"✅ Output files created:")
    print(f"   - pos_output.csv (POS tagging results)")
    print(f"   - banking_entities.csv (Banking entities)")
    print(f"   - banking_entities.xlsx (Banking entities - Excel)")


In [19]:
import stanza

# build a pipeline on CPU; set use_gpu=True if you have a GPU available
nlp = stanza.Pipeline(
    lang='fa',
    processors='tokenize,mwt,pos,lemma',
    use_gpu=False
)


In [22]:
import stanza

# build a pipeline on CPU; set use_gpu=True if you have a GPU available
nlp = stanza.Pipeline(
    lang='fa',
    processors='tokenize,mwt,pos,lemma',
    use_gpu=False
)
# text = "کتاب‌های جذاب را من دیروز خریدم."
text = " در تبادل مانده های انتقالی کاربر پیغام خطای کد ملیتان وارد شده را دریافت میکند و بعد از آن میخواهیم ببینیم مساله حل شده یا نشده است اتمام "

doc = nlp(text)

for sentence in doc.sentences:
    for word in sentence.words:
        print(f"{word.text}\t→ {word.lemma}")


در	→ در
تبادل	→ تبادل
مانده	→ مانده
های	→ های
انتقالی	→ انتقالی
کاربر	→ کاربر
پیغام	→ پیغام
خطای	→ خطا
کد	→ کد
ملیتان	→ ملیت
وارد	→ وارد
شده	→ شد
را	→ را
دریافت	→ دریافت
میکند	→ کرد
و	→ و
بعد	→ بعد
از	→ از
آن	→ آن
میخواهیم	→ خواست
ببینیم	→ دید
مساله	→ مساله
حل	→ حل
شده	→ شد
یا	→ یا
نشده	→ شد
است	→ است
اتمام	→ اتمام


In [18]:
import stanza
import os
import csv
import pandas as pd

# Download and initialize Stanza pipeline for Persian
print("Initializing Persian NLP pipeline...")
stanza.download('fa', verbose=False)
nlp = stanza.Pipeline(lang='fa', processors='tokenize,pos', verbose=False)

# POS tag translations to Persian
pos_translations = {
    'NOUN': 'اسم',
    'VERB': 'فعل', 
    'ADJ': 'صفت',
    'ADV': 'قید',
    'PRON': 'ضمیر',
    'DET': 'تعیینکننده',
    'ADP': 'حرف اضافه',
    'NUM': 'عدد',
    'CONJ': 'حرف ربط',
    'CCONJ': 'حرف ربط هماهنگکننده', 
    'SCONJ': 'حرف ربط تابعساز',
    'PART': 'ذره',
    'INTJ': 'حرف تعجب',
    'PUNCT': 'نشانه نگارشی',
    'SYM': 'نماد',
    'X': 'سایر'
}

def read_file(file_path):
    """Read text file with UTF-8 encoding"""
    if not os.path.exists(file_path):
        print(f"File {file_path} not found!")
        return None
    
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read().strip()
            print(f"Successfully read {len(content)} characters from {file_path}")
            return content
    except Exception as e:
        print(f"Error reading file {file_path}: {e}")
        return None

# def read_banking_terms_from_csv(csv_file):
#     """Read banking terms from a CSV file"""
#     banking_terms = []
    
#     if not os.path.exists(csv_file):
#         print(f"Banking terms file {csv_file} not found!")
#         print("Please create banking_terms.csv with your banking terms.")
#         return []
    
#     try:
#         with open(csv_file, 'r', encoding='utf-8') as f:
#             reader = csv.reader(f)
#             for row_num, row in enumerate(reader, 1):
#                 for term in row:
#                     term = term.strip()
#                     if term:  # Only add non-empty terms
#                         banking_terms.append(term)
        
#         # Remove duplicates while preserving order
#         banking_terms = list(dict.fromkeys(banking_terms))
#         print(f"Successfully loaded {len(banking_terms)} unique banking terms from {csv_file}")
#         return banking_terms
        
#     except Exception as e:
#         print(f"Error reading banking terms from {csv_file}: {e}")
#         return []

def split_text_by_sentences(text, max_sentences_per_chunk=10):
    """Split text into smaller chunks based on sentence count"""
    sentences = []
    current_sentence = ""
    
    # Split by Persian and English sentence markers
    for char in text:
        current_sentence += char
        if char in ['۔', '.', '!', '?', '؟', ':', '؛']:
            if current_sentence.strip():
                sentences.append(current_sentence.strip())
            current_sentence = ""
    
    # Add remaining text as last sentence if exists
    if current_sentence.strip():
        sentences.append(current_sentence.strip())
    
    # Group sentences into chunks
    chunks = []
    for i in range(0, len(sentences), max_sentences_per_chunk):
        chunk = ' '.join(sentences[i:i + max_sentences_per_chunk])
        if chunk.strip():  # Only add non-empty chunks
            chunks.append(chunk)
    
    return chunks

def split_text_by_characters(text, max_chars_per_chunk=1000):
    """Split text into chunks by character count"""
    chunks = []
    words = text.split()
    current_chunk = ""
    
    for word in words:
        # If adding this word would exceed the limit, start a new chunk
        if len(current_chunk) + len(word) + 1 > max_chars_per_chunk and current_chunk:
            chunks.append(current_chunk.strip())
            current_chunk = word
        else:
            if current_chunk:
                current_chunk += " " + word
            else:
                current_chunk = word
    
    # Add the last chunk if it has content
    if current_chunk.strip():
        chunks.append(current_chunk.strip())
    
    return chunks

def split_text_by_words(text, max_words_per_chunk=100):
    """Split text into chunks by word count"""
    words = text.split()
    chunks = []
    
    for i in range(0, len(words), max_words_per_chunk):
        chunk_words = words[i:i + max_words_per_chunk]
        chunk = ' '.join(chunk_words)
        if chunk.strip():
            chunks.append(chunk)
    
    return chunks

def process_text_chunk(text_chunk, chunk_num=1, total_chunks=1):
    """Process a single chunk of text"""
    try:
        print(f"Processing chunk {chunk_num}/{total_chunks} ({len(text_chunk)} characters)")
        doc = nlp(text_chunk)
        results = []
        
        for sentence in doc.sentences:
            for word in sentence.words:
                if word.text.strip():  # Only process non-empty words
                    persian_pos = pos_translations.get(word.upos, word.upos)
                    results.append({
                        'word': word.text,
                        'pos': word.upos,
                        'persian_pos': persian_pos
                    })
        
        print(f"Chunk {chunk_num} processed: {len(results)} words extracted")
        return results
        
    except Exception as e:
        print(f"Error processing chunk {chunk_num}: {e}")
        return []

def process_text_safely(text):
    """Process text safely by splitting into manageable chunks"""
    if not text or not text.strip():
        print("Empty text provided")
        return []
    
    print(f"Starting text processing... ({len(text)} characters)")
    
    # For very small texts, process directly
    if len(text) < 500:
        return process_text_chunk(text)
    
    # Choose splitting method based on text size
    if len(text) < 2000:
        # Small text: split by sentences (10 sentences per chunk)
        text_chunks = split_text_by_sentences(text, max_sentences_per_chunk=10)
        print(f"Small text: split into {len(text_chunks)} sentence-based chunks")
    elif len(text) < 10000:
        # Medium text: split by characters (1000 chars per chunk)
        text_chunks = split_text_by_characters(text, max_chars_per_chunk=1000)
        print(f"Medium text: split into {len(text_chunks)} character-based chunks")
    else:
        # Large text: split by words (100 words per chunk)
        text_chunks = split_text_by_words(text, max_words_per_chunk=100)
        print(f"Large text: split into {len(text_chunks)} word-based chunks")
    
    # Ensure we have multiple chunks for processing
    if len(text_chunks) == 1 and len(text) > 1000:
        # Force split large single chunks
        text_chunks = split_text_by_characters(text, max_chars_per_chunk=800)
        print(f"Forced additional splitting: {len(text_chunks)} chunks")
    
    all_results = []
    successful_chunks = 0
    failed_chunks = 0
    
    print(f"Processing {len(text_chunks)} chunks...")
    
    for i, chunk in enumerate(text_chunks):
        try:
            chunk_results = process_text_chunk(chunk, i+1, len(text_chunks))
            if chunk_results:
                all_results.extend(chunk_results)
                successful_chunks += 1
            else:
                failed_chunks += 1
                print(f"Warning: Chunk {i+1} returned no results")
        except Exception as e:
            failed_chunks += 1
            print(f"Error: Chunk {i+1} failed with error: {e}")
    
    print(f"Processing completed:")
    print(f"  ✅ Successful chunks: {successful_chunks}")
    print(f"  ❌ Failed chunks: {failed_chunks}")
    print(f"  📊 Total words extracted: {len(all_results)}")
    
    return all_results

# def extract_banking_entities(results, banking_terms):
#     """Extract banking entities using terms from CSV file"""
#     if not banking_terms:
#         print("No banking terms provided for entity extraction")
#         return []
    
#     entities = []
#     seen_entities = set()  # Track unique combinations
    
#     print(f"Searching for banking entities using {len(banking_terms)} terms...")
    
#     for i in range(len(results) - 1):
#         current_word = results[i]
#         next_word = results[i + 1]
        
#         # Check for banking term + noun
#         if current_word['word'] in banking_terms and next_word['pos'] == 'NOUN':
#             entity_text = f"{current_word['word']} {next_word['word']}"
#             if entity_text not in seen_entities:
#                 entity = {
#                     'entity': entity_text,
#                     'banking_term': current_word['word']
#                 }
#                 entities.append(entity)
#                 seen_entities.add(entity_text)
        
#         # Check for noun + banking term
#         elif current_word['pos'] == 'NOUN' and next_word['word'] in banking_terms:
#             entity_text = f"{current_word['word']} {next_word['word']}"
#             if entity_text not in seen_entities:
#                 entity = {
#                     'entity': entity_text,
#                     'banking_term': next_word['word']
#                 }
#                 entities.append(entity)
#                 seen_entities.add(entity_text)
    
#     print(f"Found {len(entities)} unique banking entities")
#     return entities

def save_results_csv(results, output_file):
    """Save POS tagging results to CSV"""
    try:
        with open(output_file, 'w', encoding='utf-8', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(['کلمه', 'تگ', 'نقش'])
            
            for item in results:
                writer.writerow([item['word'], item['pos'], item['persian_pos']])
        
        print(f"POS results saved to {output_file}")
        return True
    except Exception as e:
        print(f"Error saving POS results: {e}")
        return False

# def save_banking_entities_csv(entities, output_file):
#     """Save banking entities to CSV"""
#     try:
#         with open(output_file, 'w', encoding='utf-8', newline='') as f:
#             writer = csv.writer(f)
#             writer.writerow(['واژه بانکی', 'ترکیب شناسایی شده'])
            
#             for entity in entities:
#                 writer.writerow([entity['banking_term'], entity['entity']])
        
#         print(f"Banking entities saved to {output_file}")
#         return True
#     except Exception as e:
#         print(f"Error saving banking entities CSV: {e}")
#         return False

# def save_banking_entities_excel(entities, output_file):
#     """Save banking entities to Excel"""
#     try:
#         data = {
#             'واژه بانکی': [entity['banking_term'] for entity in entities],
#             'ترکیب شناسایی شده': [entity['entity'] for entity in entities]
#         }
#         df = pd.DataFrame(data)
#         df.to_excel(output_file, index=False, engine='openpyxl')
#         print(f"Banking entities saved to {output_file}")
#         return True
#     except ImportError:
#         print("pandas or openpyxl not installed. Install with: pip install pandas openpyxl")
#         return False
#     except Exception as e:
#         print(f"Error saving banking entities Excel: {e}")
#         return False

def print_sample_results(results, sample_size=10):
    """Skip printing POS results - only save to files"""
    pass

def print_banking_entities(entities, max_display=20):
    """Skip printing banking entities - only save to files"""
    pass

def main():
    """Main execution function"""
    print("=== Persian Banking Entity Extractor ===\n")
    
    # File paths
    # banking_terms_file = "banking_terms.csv"
    input_file = "notices.txt"
    
    # Step 1: Load banking terms
    # print("Step 1: Loading banking terms...")
    # # banking_terms = read_banking_terms_from_csv(banking_terms_file)
    # if not banking_terms:
    #     print("❌ No banking terms loaded. Please create banking_terms.csv file.")
    #     return
    
    # Step 2: Read input text
    print("\nStep 2: Reading input text...")
    text = read_file(input_file)
    if not text:
        print("❌ Failed to read input text. Please check test.txt file.")
        return
    
    # Step 3: Process text for POS tagging
    print("\nStep 3: Processing text for POS tagging...")
    results = process_text_safely(text)
    if not results:
        print("❌ Text processing failed.")
        return
    
    # Step 4: Save POS results
    print("\nStep 4: Saving POS tagging results...")
    save_results_csv(results, "pos_output.csv")
    
    # Step 5: Extract banking entities
    # print("\nStep 5: Extracting banking entities...")
    # banking_entities = extract_banking_entities(results, banking_terms)
    
    # if not banking_entities:
    #     print("❌ No banking entities found in the text.")
    #     return
    
    # Step 6: Save banking entities (no printing)
    # print("\nStep 6: Saving banking entities...")
    
    # Save to CSV
    # save_banking_entities_csv(banking_entities, "banking_entities.csv")
    
    # # Save to Excel
    # save_banking_entities_excel(banking_entities, "banking_entities.xlsx")
    
    # Final summary
    print(f"\n=== Summary ===")
    print(f"✅ Text processed: {len(text)} characters")
    print(f"✅ Words analyzed: {len(results)}")
    # print(f"✅ Banking terms loaded: {len(banking_terms)}")
    # print(f"✅ Unique banking entities found: {len(banking_entities)}")
    print(f"✅ Output files created:")
    print(f"   - pos_output.csv (POS tagging results)")
    # print(f"   - banking_entities.csv (Banking entities)")
    # print(f"   - banking_entities.xlsx (Banking entities - Excel)")

# Execute main function
if __name__ == "__main__":
    main()

Initializing Persian NLP pipeline...
=== Persian Banking Entity Extractor ===


Step 2: Reading input text...
Successfully read 4512997 characters from notices.txt

Step 3: Processing text for POS tagging...
Starting text processing... (4512997 characters)
Large text: split into 8380 word-based chunks
Processing 8380 chunks...
Processing chunk 1/8380 (557 characters)
Chunk 1 processed: 100 words extracted
Processing chunk 2/8380 (534 characters)
Chunk 2 processed: 100 words extracted
Processing chunk 3/8380 (521 characters)
Chunk 3 processed: 101 words extracted
Processing chunk 4/8380 (510 characters)
Chunk 4 processed: 100 words extracted
Processing chunk 5/8380 (488 characters)
Chunk 5 processed: 100 words extracted
Processing chunk 6/8380 (507 characters)
Chunk 6 processed: 100 words extracted
Processing chunk 7/8380 (504 characters)
Chunk 7 processed: 100 words extracted
Processing chunk 8/8380 (499 characters)
Chunk 8 processed: 100 words extracted
Processing chunk 9/8380 (544 ch