In [1]:
from google.colab import drive
drive.mount('/content/drive')
print("Google Drive mounted successfully.")

Mounted at /content/drive
Google Drive mounted successfully.


In [2]:
!pip install hazm nltk

Collecting hazm
  Downloading hazm-0.10.0-py3-none-any.whl.metadata (11 kB)
Collecting fasttext-wheel<0.10.0,>=0.9.2 (from hazm)
  Downloading fasttext_wheel-0.9.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (16 kB)
Collecting flashtext<3.0,>=2.7 (from hazm)
  Downloading flashtext-2.7.tar.gz (14 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting gensim<5.0.0,>=4.3.1 (from hazm)
  Downloading gensim-4.3.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (8.1 kB)
Collecting numpy==1.24.3 (from hazm)
  Downloading numpy-1.24.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.6 kB)
Collecting python-crfsuite<0.10.0,>=0.9.9 (from hazm)
  Downloading python_crfsuite-0.9.11-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.3 kB)
Collecting pybind11>=2.2 (from fasttext-wheel<0.10.0,>=0.9.2->hazm)
  Downloading pybind11-3.0.0-py3-none-any.whl.metadata (10.0 kB)
Collecting scipy<1.14.0,>=1.7.0

In [None]:
import os
from hazm import Normalizer, SentenceTokenizer, WordTokenizer
import re
import nltk
from tqdm.notebook import tqdm # For progress bar

# Download NLTK data (punkt for sentence tokenizer)
nltk.download('punkt')

# Initialize Hazm tools
normalizer = Normalizer()
sentence_tokenizer = SentenceTokenizer()
word_tokenizer = WordTokenizer()

# Function to clean and normalize text
def clean_text(text):
    text = normalizer.normalize(text)
    # Remove extra spaces
    text = re.sub(r'\s+', ' ', text).strip()
    return text

# Function to preprocess a single legal text file
def preprocess_legal_text_initial(file_path, output_dir):
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()

        cleaned_text = clean_text(text)
        sentences = sentence_tokenizer.tokenize(cleaned_text)

        # Ensure output directory exists
        os.makedirs(output_dir, exist_ok=True)

        # Create output file path
        file_name = os.path.basename(file_path)
        output_file_path = os.path.join(output_dir, f"processed_{file_name}")

        with open(output_file_path, 'w', encoding='utf-8') as f_out:
            for sentence in sentences:
                if sentence.strip(): # Write non-empty sentences
                    f_out.write(sentence.strip() + '\n')

        print(f"Successfully processed {file_name} and saved to {output_file_path}")
        return True
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return False
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return False

In [None]:
# Define your input directory for raw legal texts
input_legal_texts_dir = '/content/drive/MyDrive/my_legal_corpus' # ***** این مسیر رو با پوشه فایل‌های خام خودت اصلاح کن *****

# Define the output directory for initially processed text files
processed_output_dir = '/content/drive/MyDrive/processed_legal_texts_temp'
os.makedirs(processed_output_dir, exist_ok=True)

# Process all files in the input directory
print(f"Starting initial preprocessing of files from: {input_legal_texts_dir}")
processed_files_count = 0
for filename in tqdm(os.listdir(input_legal_texts_dir), desc="Processing files"):
    if filename.endswith(".txt"): # Process only .txt files
        file_path = os.path.join(input_legal_texts_dir, filename)
        if preprocess_legal_text_initial(file_path, processed_output_dir):
            processed_files_count += 1

print(f"\nFinished initial preprocessing. Total files processed: {processed_files_count}")

In [None]:
# Define the path for the final combined text file
final_combined_text_file_path = '/content/drive/MyDrive/all_legal_sentences.txt'

print(f"Combining all processed sentences into: {final_combined_text_file_path}")
total_lines = 0
with open(final_combined_text_file_path, 'w', encoding='utf-8') as outfile:
    for filename in tqdm(os.listdir(processed_output_dir), desc="Combining files"):
        if filename.startswith("processed_") and filename.endswith(".txt"):
            filepath = os.path.join(processed_output_dir, filename)
            with open(filepath, 'r', encoding='utf-8') as infile:
                for line in infile:
                    if line.strip(): # Write only non-empty lines
                        outfile.write(line)
                        total_lines += 1

print(f"\nAll processed sentences combined. Total lines written: {total_lines}")
print("Initial preprocessing and data consolidation complete!")

In [None]:
# Define the path for the final combined text file
final_combined_text_file_path = '/content/drive/MyDrive/all_legal_sentences.txt'

print(f"Combining all processed sentences into: {final_combined_text_file_path}")
total_lines = 0
with open(final_combined_text_file_path, 'w', encoding='utf-8') as outfile:
    for filename in tqdm(os.listdir(processed_output_dir), desc="Combining files"):
        if filename.startswith("processed_") and filename.endswith(".txt"):
            filepath = os.path.join(processed_output_dir, filename)
            with open(filepath, 'r', encoding='utf-8') as infile:
                for line in infile:
                    if line.strip(): # Write only non-empty lines
                        outfile.write(line)
                        total_lines += 1

print(f"\nAll processed sentences combined. Total lines written: {total_lines}")
print("Initial preprocessing and data consolidation complete!")

In [None]:
!pip install transformers datasets accelerate tokenizers bitsandbytes

# بعد از اجرای این سلول، نیازی به ریستارت Runtime نیست چون محیط کاملاً تازه است و GPU فعال.
print("\nRequired Hugging Face libraries installed.")

In [None]:
from tokenizers import BertWordPieceTokenizer
from pathlib import Path
import os
from transformers import AutoTokenizer

# Define the path to your combined text file in Google Drive (created in Cell 5)
text_file_path = '/content/drive/MyDrive/all_legal_sentences.txt'

# Ensure the directory for saving the tokenizer exists
tokenizer_output_dir = '/content/drive/MyDrive/custom_legal_bert_tokenizer'
os.makedirs(tokenizer_output_dir, exist_ok=True)

# Initialize a tokenizer
tokenizer = BertWordPieceTokenizer(
    clean_text=True,
    handle_chinese_chars=False,
    strip_accents=False,
    lowercase=False,
)

# Train the tokenizer
tokenizer.train(
    files=[text_file_path],
    vocab_size=30000, # You can adjust this based on your corpus size
    min_frequency=2,
    show_progress=True,
    special_tokens=["[PAD]", "[UNK]", "[CLS]", "[SEP]", "[MASK]"],
)

# Save the tokenizer files
tokenizer.save_model(tokenizer_output_dir)

print(f"\nTokenizer training complete! Files saved to: {tokenizer_output_dir}")
print(f"Vocab size: {tokenizer.get_vocab_size()}")

# Optional: Load and test the tokenizer
loaded_tokenizer = AutoTokenizer.from_pretrained(tokenizer_output_dir)
test_sentence = "این یک نمونه متن حقوقی برای تست توکنایزر است. ماده ۱۲ قانون مجازات اسلامی."
encoded = loaded_tokenizer.encode_plus(test_sentence, add_special_tokens=True)
print(f"\nTest sentence: {test_sentence}")
print(f"Encoded IDs: {encoded.input_ids}")
print(f"Decoded tokens: {loaded_tokenizer.convert_ids_to_tokens(encoded.input_ids)}")

In [None]:
from transformers import AutoTokenizer
from datasets import Dataset, DatasetDict
import os

# Define the path where your custom tokenizer was saved (created in Cell 7)
tokenizer_output_dir = '/content/drive/MyDrive/custom_legal_bert_tokenizer'

# Define the path to your combined text file in Google Drive (created in Cell 5)
text_file_path = '/content/drive/MyDrive/all_legal_sentences.txt'

# 1. Load the custom tokenizer
print("Loading custom tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(tokenizer_output_dir)
print("Tokenizer loaded successfully.")

# 2. Load the dataset (Revised approach for reading text file directly)
print(f"Loading dataset from: {text_file_path}")
try:
    with open(text_file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    lines = [line.strip() for line in lines if line.strip()]

    raw_dataset = Dataset.from_dict({"text": lines})
    dataset = DatasetDict({"train": raw_dataset})

    print("Dataset loaded successfully.")
    print(f"Dataset structure: {dataset}")

    print("\nSample from dataset:")
    print(dataset["train"][0])

except FileNotFoundError:
    print(f"Error: The file '{text_file_path}' was not found. Please ensure the path is correct and the file exists.")
except Exception as e:
    print(f"An error occurred while loading the dataset: {e}")

In [None]:
# Function to tokenize the dataset
def tokenize_function(examples):
    # Truncate to maximum sequence length if sentences are too long
    # We will use the tokenizer's model_max_length which is typically 512 for BERT
    return tokenizer(examples["text"], truncation=True)

print("Tokenizing dataset...")
tokenized_datasets = dataset.map(
    tokenize_function,
    batched=True,
    num_proc=os.cpu_count(), # Use multiple processes if available for faster tokenization
    remove_columns=["text"],
)
print("Dataset tokenized successfully.")
print(f"Tokenized dataset structure: {tokenized_datasets}")


# Function to group texts into blocks of max_length for MLM
block_size = 128 # You can increase this to 256 or 512 if your GPU memory allows and you want longer contexts

def group_texts(examples):
    concatenated_examples = {k: sum(examples[k], []) for k in examples.keys()}
    total_length = len(concatenated_examples[list(examples.keys())[0]])
    total_length = (total_length // block_size) * block_size
    result = {
        k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
        for k, t in concatenated_examples.items()
    }
    result["labels"] = result["input_ids"].copy()
    return result

print(f"Grouping texts into blocks of size {block_size}...")
lm_datasets = tokenized_datasets.map(
    group_texts,
    batched=True,
    num_proc=os.cpu_count(),
)
print("Text grouping complete.")
print(f"Final dataset structure for MLM: {lm_datasets}")

print("\nSample of prepared MLM dataset (first example):")
print(lm_datasets["train"][0])

In [None]:
from transformers import BertConfig, BertForMaskedLM, TrainingArguments, Trainer, DataCollatorForLanguageModeling
import torch
import os

# 3. Configure the BERT model from scratch
# This creates a new BERT model with the specified configuration.
print("Initializing BERT configuration and model...")
config = BertConfig(
    vocab_size=tokenizer.vocab_size, # Use the vocab size from your trained tokenizer
    max_position_embeddings=512, # Max sequence length for the model
    num_hidden_layers=6, # Number of transformer layers (you can increase this to 12 for base BERT if GPU allows)
    num_attention_heads=6, # Number of attention heads (you can increase this to 12 for base BERT if GPU allows)
    hidden_size=384, # Dimension of the hidden layers (you can increase this to 768 for base BERT if GPU allows)
    type_vocab_size=2,
)

model = BertForMaskedLM(config=config)
print(f"Number of parameters in the new model: {model.num_parameters()}")


# 4. Set up Training Arguments
output_dir = '/content/drive/MyDrive/legal_bert_pretraining_output' # Directory to save checkpoints and logs
os.makedirs(output_dir, exist_ok=True)

print("Setting up Training Arguments...")
training_args = TrainingArguments(
    output_dir=output_dir,
    overwrite_output_dir=True,
    num_train_epochs=5, # Number of training epochs (iterations over the dataset) - Adjust based on corpus size and convergence
    per_device_train_batch_size=32, # Batch size per GPU/CPU - Adjust based on GPU memory
    save_steps=10_000, # Save model checkpoint every 10,000 steps
    save_total_limit=2, # Keep only the last 2 checkpoints
    prediction_loss_only=True,
    logging_steps=100, # Log training progress every 100 steps
    learning_rate=5e-5,
    weight_decay=0.01,
    do_train=True,
    gradient_accumulation_steps=1, # Number of updates steps to accumulate before performing a backward/update pass.
    # Optionally enable mixed precision training for faster training and less memory usage (if using GPU)
    # fp16=True, # For NVIDIA GPUs
    # bf16=True, # For AMD GPUs and newer NVIDIA GPUs (e.g., A100)
)


# Data collator for Masked Language Modeling
print("Setting up Data Collator for Masked Language Modeling...")
data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=True,
    mlm_probability=0.15 # 15% of tokens will be masked for prediction
)

# 5. Initialize the Trainer and start pre-training
print("Initializing Trainer and starting pre-training...")
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=lm_datasets["train"],
    data_collator=data_collator,
)

print("\n--- Starting BERT Pre-training ---")
# Start training! This will take a long time depending on your dataset size and GPU.
trainer.train()
print("\nBERT Pre-training completed!")

# Save the final model
final_model_save_path = os.path.join(output_dir, "final_model")
model.save_pretrained(final_model_save_path)
tokenizer.save_pretrained(final_model_save_path) # Save tokenizer with the model
print(f"\nFinal pre-trained model and tokenizer saved to: {final_model_save_path}")