In [None]:
!pip install pytesseract pdf2image opencv-python-headless numpy langdetect pymupdf pandas sentence-transformers faiss-cpu flask tqdm

## 1. Downloading the documents

In [None]:
import os
import pandas as pd
import requests
from urllib.parse import urlparse
import pytesseract
from pdf2image import convert_from_path
import cv2
import numpy as np
from langdetect import detect
import fitz  # PyMuPDF
import re
import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("ocr_processing.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Load the Excel file and download files
def download_files_by_type(excel_path, download_base_dir="downloads", verify_ssl=False):
    df = pd.read_excel(excel_path)
    if not os.path.exists(download_base_dir):
        os.makedirs(download_base_dir)

    for index, row in df.iterrows():
        file_name = row['titre']
        file_url = row['url']
        file_type = row['type']
        
        type_dir = os.path.join(download_base_dir, file_type)
        os.makedirs(type_dir, exist_ok=True)

        # Derive file extension from URL if possible
        parsed_url = urlparse(file_url)
        url_path = parsed_url.path
        ext_from_url = os.path.splitext(url_path)[1].lower()  # e.g., ".pdf"

        # Clean filename or fallback to default
        if pd.isna(file_name) or file_name.strip() == "":
            file_name = os.path.basename(url_path)
            if not file_name:
                file_name = f"file_{index}{ext_from_url if ext_from_url else '.bin'}"
        else:
            # Ensure file_name has proper extension
            if ext_from_url and not file_name.lower().endswith(ext_from_url):
                file_name += ext_from_url

        download_path = os.path.join(type_dir, file_name)
        print(f"Downloading {file_url} to {download_path}")

        try:
            response = requests.get(file_url, stream=True, verify=verify_ssl)
            response.raise_for_status()
            with open(download_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            print(f"Successfully downloaded {file_name}")
        except Exception as e:
            print(f"Error downloading {file_url}: {e}")

# Document Processor class (same as your original, kept unchanged)
class DocumentProcessor:
    def __init__(self, excel_path, download_base_dir="/kaggle/working/downloads", output_dir="processed_texts"):
        self.excel_path = excel_path
        self.download_base_dir = download_base_dir
        self.output_dir = output_dir
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        self.df = pd.read_excel(excel_path)
        # pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'

    def process_all_documents(self):
        results = []
        arabic_txt_source_dir = "/kaggle/input/legal-documents/legal_text/parsed_text"
    
        for index, row in self.df.iterrows():
            try:
                file_name = row['titre']
                file_type = row['type']
                language = row.get('langue', 'fr')
    
                if pd.isna(file_name) or file_name == "":
                    file_name = f"file_{index}.bin"
    
                if not file_name.lower().endswith('.pdf'):
                    file_name += '.pdf'
    
                base_name = os.path.splitext(file_name)[0]
                text_file_name = base_name + ".txt"
                text_file_path = os.path.join(self.output_dir, text_file_name)
    
                if language == 'ar':
                    # For Arabic, just copy the existing parsed text file
                    source_txt_path = os.path.join(arabic_txt_source_dir, text_file_name)
                    if os.path.exists(source_txt_path):
                        with open(source_txt_path, 'r', encoding='utf-8') as src_file:
                            content = src_file.read()
                        with open(text_file_path, 'w', encoding='utf-8') as dst_file:
                            dst_file.write(content)
                        logger.info(f"Copied parsed Arabic text for {file_name}")
                        results.append({
                            'file_name': file_name,
                            'language': language,
                            'text_file': text_file_path,
                            'status': 'copied from parsed_text'
                        })
                    else:
                        logger.warning(f"Parsed Arabic file not found: {source_txt_path}")
                        results.append({
                            'file_name': file_name,
                            'language': language,
                            'text_file': None,
                            'status': 'missing parsed Arabic text'
                        })
                    continue  # Skip further processing for Arabic
                # For French or others, process as usual
                file_path = os.path.join(self.download_base_dir, file_type, file_name)
                if not os.path.exists(file_path):
                    logger.warning(f"File not found: {file_path}")
                    continue

                if file_path.lower().endswith('.pdf'):
                    text = self.process_pdf(file_path, language)
                    with open(text_file_path, 'w', encoding='utf-8') as f:
                        f.write(text)
                    results.append({
                        'file_name': file_name,
                        'language': language,
                        'text_file': text_file_path,
                        'status': 'processed'
                    })
                    logger.info(f"Successfully processed {file_name}")
                else:
                    logger.warning(f"Unsupported file format: {file_path}")
    
            except Exception as e:
                logger.error(f"Error processing document {file_name if 'file_name' in locals() else index}: {str(e)}")
                results.append({
                    'file_name': file_name if 'file_name' in locals() else f"document_{index}",
                    'status': 'error',
                    'error_message': str(e)
                })
    
        results_df = pd.DataFrame(results)
        results_df.to_csv(os.path.join(self.output_dir, "processing_results.csv"), index=False)
        return results_df

    def process_pdf(self, pdf_path, language):
        base_name = os.path.splitext(os.path.basename(pdf_path))[0]
        
        if language == 'ar':
            # Skip processing and load already translated text from the provided folder
            translated_path = f"/kaggle/input/legal-documents/legal_text/parsed_text/{base_name}.txt"
            if os.path.exists(translated_path):
                logger.info(f"Loading Arabic translation for {pdf_path} from {translated_path}")
                try:
                    with open(translated_path, 'r', encoding='utf-8') as f:
                        return f.read()
                except Exception as e:
                    logger.error(f"Error reading translated Arabic file: {e}")
                    return ""
            else:
                logger.warning(f"Translation not found for Arabic file: {base_name}")
                return ""
    
        # For French documents, proceed as normal
        extracted_text = self.extract_text_from_pdf(pdf_path)
        if not extracted_text or len(extracted_text.strip()) < 100:
            logger.info(f"Applying OCR to {pdf_path} (language: {language})")
            extracted_text = self.apply_ocr(pdf_path, language)
        else:
            logger.info(f"Text successfully extracted directly from {pdf_path}")
        return extracted_text


    def extract_text_from_pdf(self, pdf_path):
        try:
            text = ""
            pdf_document = fitz.open(pdf_path)
            for page_num in range(len(pdf_document)):
                page = pdf_document[page_num]
                text += page.get_text()
            pdf_document.close()
            return text
        except Exception as e:
            logger.error(f"Error extracting text directly from PDF: {str(e)}")
            return ""

    def apply_ocr(self, pdf_path, language):
        tesseract_lang = 'ara' if language == 'ar' else 'fra'
        try:
            images = convert_from_path(pdf_path)
        except Exception as e:
            logger.error(f"Error converting PDF to images: {str(e)}")
            return ""
        full_text = ""
        for i, image in enumerate(images):
            logger.info(f"Processing page {i+1}/{len(images)} of {pdf_path}")
            open_cv_image = np.array(image)[:, :, ::-1].copy()
            if language == 'ar':
                preprocessed = self.preprocess_image_arabic(open_cv_image)
            else:
                preprocessed = self.preprocess_image_french(open_cv_image)
            try:
                config = '--psm 6 --oem 1' if language == 'ar' else ''
                text = pytesseract.image_to_string(preprocessed, lang=tesseract_lang, config=config)
                full_text += text + "\n\n"
            except Exception as e:
                logger.error(f"OCR error on page {i+1}: {str(e)}")
        full_text = self.clean_text(full_text, language)
        return full_text

    def preprocess_image_arabic(self, image):
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        _, binary = cv2.threshold(gray, 150, 255, cv2.THRESH_BINARY)
        kernel = np.ones((1, 1), np.uint8)
        dilated = cv2.dilate(binary, kernel, iterations=1)
        return dilated

    def preprocess_image_french(self, image):
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        binary = cv2.adaptiveThreshold(
            gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2
        )
        denoised = cv2.fastNlMeansDenoising(binary, None, 10, 7, 21)
        return denoised

    def clean_text(self, text, language):
        text = re.sub(r'\s+', ' ', text)
        text = re.sub(r'\s[^\w\s]\s', ' ', text)
        text = text.replace('|', 'I')
        return text

In [None]:
if __name__ == "__main__":
    excel_file = "/kaggle/input/legal-documents/legal_text/documents_fiscaux.xlsx"
    download_dir = "/kaggle/working/downloads"
    output_dir = "/kaggle/working/processed_texts"
    
    # Download all files
    download_files_by_type(excel_file, download_base_dir=download_dir, verify_ssl=False)
    
    # Check if files were downloaded
    print("Checking downloaded files:")
    file_count = 0
    for root, dirs, files in os.walk(download_dir):
        for file in files:
            file_count += 1
            print(f"Found file: {os.path.join(root, file)}")
    
    if file_count == 0:
        print("No files were downloaded. Check download errors.")
        exit(1)
    
    # Debug Excel file reading
    print("\nAttempting to read Excel file:")
    try:
        df = pd.read_excel(excel_file)
        print(f"Excel file read successfully. Shape: {df.shape}")
        print(f"Columns: {df.columns.tolist()}")
        print(f"First 3 rows:")
        print(df.head(3))
        
        # Check if required columns exist
        required_columns = ['titre', 'type', 'url']
        missing_columns = [col for col in required_columns if col not in df.columns]
        if missing_columns:
            print(f"Warning: Missing required columns: {missing_columns}")
        
        # Debug file path construction
        print("\nAttempting to match downloaded files with Excel entries:")
        for index, row in df.iterrows():
            if index > 5:  # Limit to first few rows for debugging
                break
                
            print(f"Row {index}:")
            file_name = row.get('titre', None)
            file_type = row.get('type', None)
            
            print(f"  titre: {file_name}")
            print(f"  type: {file_type}")
            
            if pd.isna(file_name) or file_name == "":
                file_name = f"file_{index}.bin"
                
            expected_path = os.path.join(download_dir, file_type, file_name)
            print(f"  Looking for file: {expected_path}")
            print(f"  File exists: {os.path.exists(expected_path)}")
    except Exception as e:
        print(f"Error reading Excel file: {str(e)}")
    
    # Run processing
    try:
        processor = DocumentProcessor(
            excel_path=excel_file,
            download_base_dir=download_dir,
            output_dir=output_dir
        )
        results = processor.process_all_documents()
        print(f"Processing completed with {len(results)} files. Results saved to {output_dir}/processing_results.csv")
    except Exception as e:
        print(f"Error during document processing: {str(e)}")

In [5]:
import pandas as pd
import os
import json

def generate_metadata_json(xlsx_path, processed_texts_dir, output_json_path):
    # Load metadata Excel
    df = pd.read_excel(xlsx_path)

    all_entries = []

    for _, row in df.iterrows():
        entry = row.to_dict()
        file_name = row.get('titre')

        if pd.isna(file_name) or file_name == "":
            file_name = f"file_{_}"

        if not file_name.lower().endswith('.pdf'):
            file_name += '.pdf'

        base_name = os.path.splitext(file_name)[0]
        txt_file_name = base_name + ".txt"
        txt_file_path = os.path.join(processed_texts_dir, txt_file_name)

        try:
            with open(txt_file_path, 'r', encoding='utf-8') as f:
                entry['text'] = f.read()
        except FileNotFoundError:
            entry['text'] = None
            print(f"Warning: Text file not found for {file_name}")

        all_entries.append(entry)

    # Save to JSON
    with open(output_json_path, 'w', encoding='utf-8') as json_file:
        json.dump(all_entries, json_file, ensure_ascii=False, indent=2)

    print(f"✅ JSON file saved to: {output_json_path}")


In [6]:
generate_metadata_json(
    xlsx_path="/kaggle/input/legal-documents/legal_text/documents_fiscaux.xlsx",
    processed_texts_dir="/kaggle/working/processed_texts",
    output_json_path="/kaggle/working/legal_documents_dataset.json"
)


✅ JSON file saved to: /kaggle/working/legal_documents_dataset.json
