In [None]:
import json
import os
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import json
import re

# Fitlers The Hadiths into 2 books only Sahih Bukhari and Muslim

In [None]:
import json
import os

# تحديد مسار ملف الأحاديث
input_file = 'all_hadiths_explanations2.json'

# التأكد من وجود الملف
if not os.path.exists(input_file):
    print(f"الملف {input_file} غير موجود")
else:
    # قراءة الملف
    with open(input_file, 'r', encoding='utf-8') as f:
        all_hadiths = json.load(f)
    
    print(f"تم قراءة {len(all_hadiths)} حديث من الملف")
    
    # الكتب المطلوبة للفلترة
    target_books = ["صحيح البخاري", "صحيح مسلم"]
    
    # فلترة الأحاديث بناءً على قيمة book في data
    filtered_hadiths = []
    for hadith_item in all_hadiths:
        # التحقق من أن الحديث يحتوي على explanation.data.book
        if "explanation" in hadith_item and \
           "data" in hadith_item["explanation"] and \
           "book" in hadith_item["explanation"]["data"] and \
           hadith_item["explanation"]["data"]["book"] in target_books:
            filtered_hadiths.append(hadith_item)
    
    print(f"عدد الأحاديث الكلي: {len(all_hadiths)}")
    print(f"عدد الأحاديث بعد الفلترة: {len(filtered_hadiths)}")
    
    # توزيع الأحاديث حسب الكتاب
    book_counts = {}
    for book in target_books:
        count = sum(1 for hadith in filtered_hadiths if hadith["explanation"]["data"]["book"] == book)
        book_counts[book] = count
    
    for book, count in book_counts.items():
        print(f"عدد الأحاديث في {book}: {count}")
    
    # حفظ الأحاديث المفلترة في ملف جديد
    output_file = 'sahihain_explanationsV2.json'
    
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(filtered_hadiths, f, ensure_ascii=False, indent=4)
    
    print(f"تم حفظ {len(filtered_hadiths)} حديث في ملف {output_file}")
    
    # عرض عينة من الأحاديث المفلترة
    sample_size = min(5, len(filtered_hadiths))
    sample_hadiths = filtered_hadiths[:sample_size]
    
    print(f"عينة من {sample_size} أحاديث من صحيح البخاري وصحيح مسلم:")
    for i, hadith in enumerate(sample_hadiths, 1):
        hadith_data = hadith["explanation"]["data"]
        print(f"\n{i}. كتاب: {hadith_data.get('book')}")
        print(f"   راوي: {hadith_data.get('rawi')}")
        print(f"   الحديث: {hadith_data.get('hadith', '')[:100]}...")

# Deleting unnecessary Keys in the datasets

In [None]:
import json
import os

def transform_hadith_data(input_file, output_file=None):
    """
    تحويل بيانات الحديث من الشكل المعقد إلى الشكل المبسط
    
    Args:
        input_file: مسار ملف الإدخال
        output_file: مسار ملف الإخراج (اختياري)
    
    Returns:
        list: قائمة بالأحاديث المعالجة
    """
    # قراءة الملف
    try:
        with open(input_file, 'r', encoding='utf-8') as f:
            data = json.load(f)
    except Exception as e:
        print(f"خطأ في قراءة الملف: {e}")
        return []
    
    # قائمة لتخزين الأحاديث المعالجة
    processed_hadiths = []
    
    # تحديد إذا كان الملف يحتوي على قائمة أو حديث واحد
    if isinstance(data, list):
        items = data
    elif isinstance(data, dict) and "hadith_id" in data:
        items = [data]
    else:
        print("صيغة الملف غير متوافقة")
        return []
    
    # معالجة كل عنصر
    for item in items:
        # إنشاء كائن الحديث المبسط
        simplified_hadith = {}
        
        # إضافة معرف الحديث
        if "hadith_id" in item:
            simplified_hadith["hadith_id"] = item["hadith_id"]
        
        # استخراج البيانات من الهيكل المعقد
        explanation_data = None
        if "explanation" in item and "data" in item["explanation"]:
            explanation_data = item["explanation"]["data"]
        
        # استخراج البيانات الأساسية
        if explanation_data:
            basic_fields = ["hadith", "rawi", "mohdith", "book", "numberOrPage", "grade", "takhrij"]
            for field in basic_fields:
                if field in explanation_data:
                    simplified_hadith[field] = explanation_data[field]
            
            # استخراج الشرح
            if "hasSharhMetadata" in explanation_data and explanation_data["hasSharhMetadata"] and "sharhMetadata" in explanation_data:
                sharh_metadata = explanation_data["sharhMetadata"]
                if "sharh" in sharh_metadata:
                    simplified_hadith["sharh"] = sharh_metadata["sharh"]
        
        # إضافة الحديث المعالج إلى القائمة
        if simplified_hadith:
            processed_hadiths.append(simplified_hadith)
    
    # حفظ النتيجة إذا تم تحديد ملف الإخراج
    if output_file and processed_hadiths:
        try:
            # إذا كان هناك حديث واحد فقط، نحفظه كعنصر واحد وليس قائمة
            if len(processed_hadiths) == 1:
                with open(output_file, 'w', encoding='utf-8') as f:
                    json.dump(processed_hadiths[0], f, ensure_ascii=False, indent=2)
            else:
                with open(output_file, 'w', encoding='utf-8') as f:
                    json.dump(processed_hadiths, f, ensure_ascii=False, indent=2)
            print(f"تم حفظ {len(processed_hadiths)} حديث في الملف: {output_file}")
        except Exception as e:
            print(f"خطأ في حفظ الملف: {e}")
    
    return processed_hadiths

def process_directory(input_dir, output_dir):
    """
    معالجة كل ملفات JSON في مجلد معين
    
    Args:
        input_dir: مسار مجلد الإدخال
        output_dir: مسار مجلد الإخراج
    """
    # التأكد من وجود مجلد الإخراج
    os.makedirs(output_dir, exist_ok=True)
    
    # عدد الملفات التي تمت معالجتها
    processed_count = 0
    
    # معالجة كل ملف JSON في المجلد
    for filename in os.listdir(input_dir):
        if filename.endswith('.json'):
            input_path = os.path.join(input_dir, filename)
            output_path = os.path.join(output_dir, f"processed_{filename}")
            
            # معالجة الملف
            hadiths = transform_hadith_data(input_path, output_path)
            
            if hadiths:
                processed_count += 1
    
    print(f"تمت معالجة {processed_count} ملف من مجلد {input_dir}")

# مثال للاستخدام
if __name__ == "__main__":
    # معالجة ملف واحد
    input_file = "sahihain_explanationsV2.json"
    output_file = "processed_all_hadithsV2.json"
    
    transform_hadith_data(input_file, output_file)
    
    # معالجة مجلد كامل
    # process_directory("input_folder", "output_folder")

# Changing the Hadith_id to Numberorpage ID 

In [None]:
import json

# تحميل البيانات
with open('processed_El_Sahihain.json', 'r', encoding='utf-8') as f:
    processed_El_Sahihain = json.load(f)

# التأكد من وجود hadith_id في كل حديث
for hadith in processed_El_Sahihain:
    hadith_id = hadith.get('numberOrPage')
    if hadith_id:
        hadith['hadith_id'] = hadith_id

# الترتيب التصاعدي حسب hadith_id (مع تحويله إلى رقم إن أمكن)
processed_El_Sahihain_sorted = sorted(
    processed_El_Sahihain,
    key=lambda h: int(h.get('hadith_id', 0)) if str(h.get('hadith_id', '')).isdigit() else float('inf')
)

# حفظ الملف المرتب
with open('sahih_bukhari_updated.json', 'w', encoding='utf-8') as f:
    json.dump(processed_El_Sahihain_sorted, f, ensure_ascii=False, indent=2)


# Cleaning the Sharh using the repeated Keywords 

In [None]:
class HadithCleaner:
    def __init__(self, explanation_markers, isnad_markers):
        self.explanation_markers = explanation_markers
        self.isnad_markers = isnad_markers

    def extract_explanation(self, sharh):
        # Step 1: Remove the hadith text if it appears at the beginning
        hadith_text_end = sharh.find('\n     الراوي :')
        if hadith_text_end != -1:
            sharh = sharh[hadith_text_end:]
        
        # Step 2: Remove metadata section
        metadata_pattern = re.compile(r'\s*الراوي :.*?التخريج :.*?\n\s*\n', re.DOTALL)
        match = metadata_pattern.search(sharh)
        if match:
            after_metadata = sharh[match.end():]
        else:
            isnad_regex = r'(.*?)\bالتخريج\s*:.*?\n+'
            match = re.search(isnad_regex, sharh, re.DOTALL)
            after_metadata = sharh[match.end():] if match else sharh

        # Step 3: Process text parts
        parts = [p.strip() for p in after_metadata.split('\n\n') if p.strip()]
        if parts and not any(marker in parts[0] for marker in self.isnad_markers):
            return parts[0]
        
        # Step 4: Look for explanation markers
        for marker in self.explanation_markers:
            idx = after_metadata.find(marker)
            if idx != -1:
                return after_metadata[idx:].strip()
        
        # Step 5: Find substantial paragraph
        for para in parts:
            if len(para) > 50 and not any(marker in para for marker in self.isnad_markers):
                return para
        
        return after_metadata.strip()

    def process_hadith(self, hadith):
        cleaned = dict(hadith)
        if 'sharh' in hadith and hadith['sharh']:
            cleaned['sharh'] = self.extract_explanation(hadith['sharh'])
        return cleaned

    def clean_hadiths(self, input_path, output_path):
        """
        Clean hadiths from input file and save to output file.
        
        Args:
            input_path (str): Path to input JSON file
            output_path (str): Path to save cleaned output JSON file
        
        Returns:
            list: Cleaned hadiths
        """
        print(f"Loading hadiths from {input_path}...")
        with open(input_path, 'r', encoding='utf-8') as f:
            hadiths = json.load(f)
        print(f"Loaded {len(hadiths)} hadiths.")

        cleaned_hadiths = []
        print("Processing hadiths...")
        with ThreadPoolExecutor() as executor:
            futures = list(tqdm(
                executor.map(self.process_hadith, hadiths),
                total=len(hadiths),
                desc="Cleaning hadiths",
                unit="hadith"
            ))
            cleaned_hadiths.extend(futures)

        print(f"Saving cleaned hadiths to {output_path}...")
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(cleaned_hadiths, f, ensure_ascii=False, indent=2)

        print(f"✓ Successfully processed and saved {len(cleaned_hadiths)} hadiths.")
        return cleaned_hadiths


In [3]:
# Expanded list of explanation markers based on analysis
EXPLANATION_MARKERS = [
    'هذا الحديثُ', 'مُناسبةُ هذا الحديثِ', 'وفي هذا الحديثِ', 'وفي هذا الأثرِ', 'وفي الحديثِ',
    'وفيه:', 'وفي هذا يَقولُ', 'وفي هذا الأثَرِ', 'وفي هذا الحَديثِ', 'وفي هذا يَحكي',
    'وفي هذا يَذكُرُ', 'وفي هذا يَروي', 'وفي هذا يَخبِرُ', 'وفي هذا يَقولُ',
    'في هذا الحديثِ', 'يُبيِّن', 'يَدُلُّ', 'أخبر', 'بيَّن', 'يُخبر', 'يُبين', 'يَروي', 
    'تَروي', 'يَذكر', 'تَذكر', 'يَحكي', 'تَحكي'
]

ISNAD_MARKERS = [
    'الراوي', 'المحدث', 'المصدر', 'الصفحة أو الرقم', 'خلاصة حكم المحدث', 'التخريج'
]


In [4]:
HadithCleaner= HadithCleaner(
    explanation_markers=EXPLANATION_MARKERS,
    isnad_markers=ISNAD_MARKERS
)

In [None]:
HadithCleaner.clean_hadiths("sahih_muslim.json", "sahih_muslim_cleaned2.json")

Loading hadiths from sahih_muslim.json...
Loaded 4288 hadiths.
Processing hadiths...


In [None]:
HadithCleaner.clean_hadiths("sahih_bukhari.json", "sahih_bukhari_cleaned2.json")

# Spliting each Headith to josn files

In [None]:
class HadithSplitter:
    def __init__(self, input_file, output_dir):
        """
        Initialize the HadithSplitter with input and output paths.
        
        Args:
            input_file (str): Path to input JSON file containing hadiths
            output_dir (str): Directory path where individual hadith files will be saved
        """
        self.input_file = input_file
        self.output_dir = output_dir

    def sanitize_filename(self, filename):
        """
        Sanitize filename by removing invalid characters.
        
        Args:
            filename (str): Original filename
            
        Returns:
            str: Sanitized filename
        """
        sanitized = re.sub(r'[\\/*?:"<>|]', '_', str(filename))
        sanitized = sanitized.replace(' ', '_')
        return sanitized

    def create_output_directory(self):
        """Create output directory if it doesn't exist."""
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)
            print(f"Created directory: {self.output_dir}")

    def load_hadiths(self):
        """
        Load hadiths from input JSON file.
        
        Returns:
            list: List of hadith dictionaries
        """
        print("Loading hadiths from file...")
        with open(self.input_file, 'r', encoding='utf-8') as f:
            hadiths = json.load(f)
        print(f"Found {len(hadiths)} hadiths to process.")
        return hadiths

    def split_hadiths(self):
        """Split hadiths into individual files."""
        self.create_output_directory()
        hadiths = self.load_hadiths()

        for hadith in tqdm(hadiths, desc="Splitting hadiths", unit="hadith"):
            hadith_id = hadith.get('hadith_id', 'unknown')
            safe_filename = self.sanitize_filename(hadith_id)
            output_file = os.path.join(self.output_dir, f"{safe_filename}.json")
            
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(hadith, f, ensure_ascii=False, indent=2)

        print(f"✓ Successfully split {len(hadiths)} hadiths into individual files in {self.output_dir}")
        
        # Display sample of created files
        print("\nSample of files created:")
        for filename in sorted(os.listdir(self.output_dir))[:5]:
            print(f"  - {filename}")

In [None]:

# Example usage:
if __name__ == "__main__":
    # For Bukhari
    bukhari_splitter = HadithSplitter(
        input_file='bukhari_cleaned_hadiths.json',
        output_dir='d:\\hadith after Q&As\\Sahih_bukhari'
    )
    bukhari_splitter.split_hadiths()
    
    # For Muslim
    muslim_splitter = HadithSplitter(
        input_file='sahih_muslim_cleaned_hadiths.json',
        output_dir='d:\\hadith after Q&As\\Sahih_muslim'
    )
    muslim_splitter.split_hadiths()

# Display all original and cleaned sharh for comparison and save to JSON


In [None]:
comparison = []
for idx, (original, cleaned) in enumerate(zip(hadiths_BU, clean_hadiths_BU)):
    original_sharh = original.get('sharh', '')
    cleaned_sharh = cleaned.get('sharh', '')
    comparison.append({
        "hadith_index": idx,
        "hadith_id": original.get('hadith_id', ''),
        "original_sharh": original_sharh,
        "cleaned_sharh": cleaned_sharh,
        "sharh_match": original_sharh == cleaned_sharh
    })
    print(f"Hadith Index: {idx}")
    print("Original Sharh:")
    print(original_sharh)
    print("\nCleaned Sharh:")
    print(cleaned_sharh)
    print("\nSharh Match:", original_sharh == cleaned_sharh)
    print("-" * 80)

# Save comparison to JSON
with open('Bukhari_sharh_comparison.json', 'w', encoding='utf-8') as f:
    json.dump(comparison, f, ensure_ascii=False, indent=2)