In [4]:
%pip -q install stanza

Note: you may need to restart the kernel to use updated packages.


In [30]:
import os
import re
import stanza

input_folder = 'example'
output_folders = {
    'ancient-hebrew': 'lemma_stanza_anc',
    'hebrew-1': 'lemma_stanza_HTB',
    'hebrew-2': 'lemma_stanza_IAHLT',
    'all': 'lemma_stanza_all'
}

pipelines = {
    'ancient-hebrew': stanza.Pipeline(
        lang = 'hbo',
        processors = 'tokenize, mwt, pos, lemma',
        model_dir = 'stanza_resources',
        download_method = None,
        use_gpu = True
    ),
    'hebrew-1': stanza.Pipeline(
        lang = 'he',
        processors = 'tokenize, mwt, pos, lemma',
        model_dir = 'stanza_resources',
        download_method = None,
        use_gpu = True
    ),
    'hebrew-2': stanza.Pipeline(
        lang = 'he',
        processors = 'tokenize, mwt, pos, lemma',
        package = 'iahltwiki',
        model_dir = 'stanza_resources',
        download_method = None,
        use_gpu = True
    )
}

2025-03-13 09:01:36 INFO: Loading these models for language: hbo (Ancient_Hebrew):
| Processor | Package       |
-----------------------------
| tokenize  | ptnk          |
| mwt       | ptnk          |
| pos       | ptnk_nocharlm |
| lemma     | ptnk_nocharlm |

2025-03-13 09:01:36 INFO: Using device: cpu
2025-03-13 09:01:36 INFO: Loading: tokenize
2025-03-13 09:01:36 INFO: Loading: mwt
2025-03-13 09:01:36 INFO: Loading: pos
2025-03-13 09:01:36 INFO: Loading: lemma
2025-03-13 09:01:36 INFO: Done loading processors!
2025-03-13 09:01:36 INFO: Loading these models for language: he (Hebrew):
| Processor | Package           |
---------------------------------
| tokenize  | combined          |
| mwt       | combined          |
| pos       | combined_charlm   |
| lemma     | combined_nocharlm |

2025-03-13 09:01:36 INFO: Using device: cpu
2025-03-13 09:01:36 INFO: Loading: tokenize
2025-03-13 09:01:36 INFO: Loading: mwt
2025-03-13 09:01:36 INFO: Loading: pos
2025-03-13 09:01:37 INFO: Loading

In [38]:
label_dict = {'PROPN': 'person1', 'PRON': 'pron1', 'NUM': 'ordinal1'}
for folder in output_folders.values():
    os.makedirs(folder, exist_ok=True)

def clean_text(text):
    cleaned_text = re.sub(r'[^\u0590-\u05FF\u0030-\u0039\s\.,:!?\'"«»\(\)\[\]\-]', '', text)
    return cleaned_text

def process_text_with_stanza(text, pipeline):
    processed_text = pipeline(text)
    prepared_text = []
    
    for sentence in processed_text.sentences:
        for word in sentence.words:
            upos = word.upos
            
            if upos is None:
                continue
            
            if upos in label_dict:
                prepared_text.append(label_dict[upos])
            elif upos != 'PUNCT':
                lemma = word.lemma.lower() if word.lemma else word.text.lower()
                prepared_text.append(lemma)
    
    return ' '.join(prepared_text)

def split_processed_text(processed_text, separator):
    return processed_text.split(separator)

In [19]:
for filename in os.listdir(input_folder):
    if filename.endswith('.txt'):
        input_path = os.path.join(input_folder, filename)
        
        with open(input_path, 'r', encoding='utf-8') as file:
            text = file.read()
        
        text = clean_text(text)
        
        for lang_code, pipeline in pipelines.items():
            processed_text = process_text_with_stanza(text, pipeline)
            
            output_path = os.path.join(output_folders[lang_code], filename)
            with open(output_path, 'w', encoding='utf-8') as output_file:
                output_file.write(processed_text)
        
        combined_text = text
        for pipeline in pipelines.values():
            combined_text = process_text_with_stanza(combined_text, pipeline)
        
        combined_output_path = os.path.join(output_folders['all'], filename)
        with open(combined_output_path, 'w', encoding='utf-8') as combined_output_file:
            combined_output_file.write(combined_text)

In [39]:
def process_files_in_batches(input_folder, output_folders, pipelines, batch_size=8):
    texts_batch = []
    filenames_batch = []

    for filename in sorted(os.listdir(input_folder)):
        if not filename.endswith('.txt'):
            continue

        input_path = os.path.join(input_folder, filename)

        with open(input_path, 'r', encoding='utf-8') as file:
            text = file.read().strip()

        cleaned_text = clean_text(text)
        texts_batch.append(cleaned_text)
        filenames_batch.append(filename)

        if len(texts_batch) >= batch_size:
            process_batch_and_save(texts_batch, filenames_batch, output_folders, pipelines)
            texts_batch.clear()
            filenames_batch.clear()

    if texts_batch:
        process_batch_and_save(texts_batch, filenames_batch, output_folders, pipelines)

def process_batch_and_save(texts_batch, filenames_batch, output_folders, pipelines):
    for lang_code, pipeline in pipelines.items():
        print(f"Processing batch for {lang_code}")
        processed_texts = process_batch(texts_batch, pipeline)

        for i, processed_text in enumerate(processed_texts):
            output_path = os.path.join(output_folders[lang_code], filenames_batch[i])
            with open(output_path, 'w', encoding='utf-8') as output_file:
                output_file.write(processed_text + '\n')

def process_batch(batch_texts, pipeline):
    combined_text = ' <<br>> '.join(batch_texts)
    processed_doc = pipeline(combined_text)
    
    results = []
    current_result = []
    
    for sentence in processed_doc.sentences:
        for word in sentence.words:
            if word.text == '<<br>>':
                results.append(' '.join(current_result))
                current_result = []
                continue
                
            upos = word.upos
            if upos is None:
                continue
            
            if upos in label_dict:
                current_result.append(label_dict[upos])
            elif upos != 'PUNCT':
                lemma = word.lemma.lower() if word.lemma else word.text.lower()
                current_result.append(lemma)
    
    if current_result:
        results.append(' '.join(current_result))
    
    return results

In [40]:
process_files_in_batches(input_folder, output_folders, pipelines, batch_size=8)

Processing batch for ancient-hebrew
Processing batch for hebrew-1
Processing batch for hebrew-2


In [43]:
import os
import re
import stanza

input_folder = 'example'
output_folders = {
    'ancient-hebrew': 'lemma_stanza_anc',
    'hebrew-1': 'lemma_stanza_HTB',
    'hebrew-2': 'lemma_stanza_IAHLT',
}

pipelines = {
    'ancient-hebrew': stanza.Pipeline(
        lang='hbo',
        processors='tokenize,mwt,pos,lemma',
        model_dir='stanza_resources',
        download_method=None,
        use_gpu=True
    ),
    'hebrew-1': stanza.Pipeline(
        lang='he',
        processors='tokenize,mwt,pos,lemma',
        model_dir='stanza_resources',
        download_method=None,
        use_gpu=True
    ),
    'hebrew-2': stanza.Pipeline(
        lang='he',
        processors='tokenize,mwt,pos,lemma',
        package='iahltwiki',
        model_dir='stanza_resources',
        download_method=None,
        use_gpu=True
    ),
}

2025-03-13 11:15:41 INFO: Loading these models for language: hbo (Ancient_Hebrew):
| Processor | Package       |
-----------------------------
| tokenize  | ptnk          |
| mwt       | ptnk          |
| pos       | ptnk_nocharlm |
| lemma     | ptnk_nocharlm |

2025-03-13 11:15:41 INFO: Using device: cpu
2025-03-13 11:15:41 INFO: Loading: tokenize
2025-03-13 11:15:41 INFO: Loading: mwt
2025-03-13 11:15:41 INFO: Loading: pos
2025-03-13 11:15:42 INFO: Loading: lemma
2025-03-13 11:15:42 INFO: Done loading processors!
2025-03-13 11:15:42 INFO: Loading these models for language: he (Hebrew):
| Processor | Package           |
---------------------------------
| tokenize  | combined          |
| mwt       | combined          |
| pos       | combined_charlm   |
| lemma     | combined_nocharlm |

2025-03-13 11:15:42 INFO: Using device: cpu
2025-03-13 11:15:42 INFO: Loading: tokenize
2025-03-13 11:15:42 INFO: Loading: mwt
2025-03-13 11:15:42 INFO: Loading: pos
2025-03-13 11:15:42 INFO: Loading

In [44]:
label_dict = {'PROPN': 'person1', 'PRON': 'pron1', 'NUM': 'ordinal1'}
for folder in output_folders.values():
    os.makedirs(folder, exist_ok=True)

def clean_text(text):
    return re.sub(r'[^\u0590-\u05FF\u0030-\u0039\s\.,:!?\'"«»\(\)\[\]\-]', '', text)


def process_documents(documents, pipeline):
    in_docs = [stanza.Document([], text=doc) for doc in documents]
    out_docs = pipeline(in_docs)

    results = []
    for doc in out_docs:
        current_result = []
        for sentence in doc.sentences:
            for word in sentence.words:
                upos = word.upos
                if upos is None:
                    continue

                if upos in label_dict:
                    current_result.append(label_dict[upos])
                elif upos != 'PUNCT':
                    lemma = word.lemma.lower() if word.lemma else word.text.lower()
                    current_result.append(lemma)
        results.append(' '.join(current_result))

    return results

def process_files_in_batches(input_folder, output_folders, pipelines, max_texts=500):
    texts_batch = []
    filenames_batch = []

    for filename in os.listdir(input_folder):
        if not filename.endswith('.txt'):
            continue

        input_path = os.path.join(input_folder, filename)

        with open(input_path, 'r', encoding='utf-8') as file:
            text = file.read().strip()

        cleaned_text = clean_text(text)
        texts_batch.append(cleaned_text)
        filenames_batch.append(filename)

        if len(texts_batch) >= max_texts:
            process_and_save(texts_batch, filenames_batch, output_folders, pipelines)
            texts_batch.clear()
            filenames_batch.clear()

    if texts_batch:
        process_and_save(texts_batch, filenames_batch, output_folders, pipelines)


def process_and_save(texts_batch, filenames_batch, output_folders, pipelines):
    for lang_code, pipeline in pipelines.items():
        print(f"Processing batch for {lang_code}")
        processed_texts = process_documents(texts_batch, pipeline)

        for i, processed_text in enumerate(processed_texts):
            output_path = os.path.join(output_folders[lang_code], filenames_batch[i])
            with open(output_path, 'w', encoding='utf-8') as output_file:
                output_file.write(processed_text + '\n')

process_files_in_batches(input_folder, output_folders, pipelines, max_texts=1000)

Processing batch for ancient-hebrew
Processing batch for hebrew-1
Processing batch for hebrew-2
