In [1]:
import pandas as pd
import psycopg2
from datasets import load_dataset
from psycopg2.extras import execute_batch
from tqdm import tqdm

In [2]:
import os
from dotenv import load_dotenv

# Load environment variables from the .env file
load_dotenv()

True

In [3]:
def create_db_connection(db_host: str, db_name: str, db_user: str, db_password: str, db_port: str = "5432"):
    """Create and return database connection and cursor"""
    conn = psycopg2.connect(
        host=db_host,
        database=db_name,
        user=db_user,
        password=db_password,
        port=db_port
    )
    return conn, conn.cursor()

In [4]:
# Database connection details
db_host = os.getenv("SUPABASE_POSTGRES_HOST")
db_name = os.getenv("SUPABASE_POSTGRES_DATABASE")
db_user = os.getenv("SUPABASE_POSTGRES_USER")
db_password = os.getenv("SUPABASE_POSTGRES_PASSWORD")

In [5]:
def insert_source(cursor, source_name):
    """Insert a source and return its ID."""
    cursor.execute(
        "INSERT INTO sources (source_name) VALUES (%s) ON CONFLICT (source_name) DO UPDATE SET source_name = EXCLUDED.source_name RETURNING source_id",
        (source_name,)
    )
    return cursor.fetchone()[0]

In [6]:
def insert_chapter(cursor, source_id, chapter_no, chapter_name):
    """Insert a chapter and return its ID."""
    cursor.execute(
        """
        INSERT INTO chapters (source_id, chapter_no, chapter_name)
        VALUES (%s, %s, %s)
        ON CONFLICT (source_id, chapter_no) DO UPDATE 
        SET chapter_name = EXCLUDED.chapter_name
        RETURNING chapter_id
        """,
        (source_id, chapter_no, chapter_name)
    )
    return cursor.fetchone()[0]

In [7]:
def insert_chain(cursor, chain_index):
    """Insert a narration chain and return its ID."""
    # If chain_index is None or empty, use a default value
    if not chain_index or str(chain_index).strip() == '':
        chain_index = 'Unknown Chain'

    cursor.execute(
        """
        INSERT INTO narration_chains (chain_index)
        VALUES (%s)
        ON CONFLICT (chain_index) DO UPDATE 
        SET chain_index = EXCLUDED.chain_index
        RETURNING chain_id
        """,
        (chain_index,)
    )
    return cursor.fetchone()[0]

In [8]:
def clean_text(text):
    """Clean and validate text fields."""
    if not text or str(text).strip() == '':
        return 'Text not available'
    return str(text).strip()

In [9]:
# Load the dataset
dataset = load_dataset("M-AI-C/all_hadith")
df = pd.DataFrame(dataset["train"])

In [10]:
df.head()

Unnamed: 0,source,chapter_no,hadith_no,chapter,chain_indx,text_ar,text_en
0,Sahih Bukhari,1,1,Revelation - كتاب بدء الوحى,"30418, 20005, 11062, 11213, 11042, 3",حدثنا الحميدي عبد الله بن الزبير، قال حدثنا سف...,Narrated 'Umar bin Al-Khattab: ...
1,Sahih Bukhari,1,2,Revelation - كتاب بدء الوحى,"30355, 20001, 11065, 10511, 53",حدثنا عبد الله بن يوسف، قال أخبرنا مالك، عن هش...,Narrated 'Aisha: ...
2,Sahih Bukhari,1,3,Revelation - كتاب بدء الوحى,"30399, 20023, 11207, 11013, 10511, 53",حدثنا يحيى بن بكير، قال حدثنا الليث، عن عقيل، ...,Narrated 'Aisha: (the m...
3,Sahih Bukhari,1,4,Revelation - كتاب بدء الوحى,"11013, 10567, 34",قال ابن شهاب وأخبرني أبو سلمة بن عبد الرحمن، أ...,Narrated Jabir bin 'Abdullah Al-Ansari while ...
4,Sahih Bukhari,1,5,Revelation - كتاب بدء الوحى,"20040, 20469, 11399, 11050, 17",حدثنا موسى بن إسماعيل، قال حدثنا أبو عوانة، قا...,Narrated Said bin Jubair: ...


In [11]:
df.columns

Index(['source', 'chapter_no', 'hadith_no', 'chapter', 'chain_indx', 'text_ar',
       'text_en'],
      dtype='object')

In [12]:
# Create database connection
conn, cursor = create_db_connection(db_host, db_name, db_user, db_password)

try:
    # Process each row in the dataset
    print("Processing dataset and inserting into database...")

    # Prepare data structures to cache IDs
    source_ids = {}
    chapter_ids = {}
    chain_ids = {}

    # Prepare batch inserts for hadiths
    hadith_data = []

    # Get the dataset size for the progress bar
    total_rows = len(dataset['train'])

    for row in tqdm(dataset['train'], total=total_rows):
        try:
            # Get or create source ID
            source_name = row.get('source', 'Unknown Source')
            if source_name not in source_ids:
                source_ids[source_name] = insert_source(
                    cursor, source_name)
            source_id = source_ids[source_name]

            # Get or create chapter ID
            chapter_no = row.get('chapter_no', 0)
            if not isinstance(chapter_no, (int, float)):
                chapter_no = 0

            chapter_key = (source_id, chapter_no)
            if chapter_key not in chapter_ids:
                chapter_ids[chapter_key] = insert_chapter(
                    cursor,
                    source_id,
                    chapter_no,
                    row.get('chapter', f'Chapter {chapter_no}')
                )
            chapter_id = chapter_ids[chapter_key]

            # Get or create chain ID
            chain_index = row.get('chain_indx', 'Unknown Chain')
            chain_key = str(
                chain_index) if chain_index else 'Unknown Chain'
            if chain_key not in chain_ids:
                chain_ids[chain_key] = insert_chain(cursor, chain_key)
            chain_id = chain_ids[chain_key]

            # Clean and prepare hadith texts
            text_ar = clean_text(row.get('text_ar', ''))
            text_en = clean_text(row.get('text_en', ''))

            # Get hadith number, default to 0 if not present
            hadith_no = row.get('hadith_no', 0)
            if not isinstance(hadith_no, (int, float)):
                hadith_no = 0

            # Prepare hadith data for batch insert
            hadith_data.append((
                chapter_id,
                chain_id,
                hadith_no,
                text_ar,
                text_en
            ))

            # Batch insert every 1000 records
            if len(hadith_data) >= 1000:
                execute_batch(
                    cursor,
                    """
                    INSERT INTO hadiths (chapter_id, chain_id, hadith_no, text_ar, text_en)
                    VALUES (%s, %s, %s, %s, %s)
                    ON CONFLICT (chapter_id, hadith_no) DO UPDATE
                    SET chain_id = EXCLUDED.chain_id,
                        text_ar = EXCLUDED.text_ar,
                        text_en = EXCLUDED.text_en
                    """,
                    hadith_data
                )
                hadith_data = []
                conn.commit()

        except Exception as e:
            print(f"Error processing row: {row}")
            print(f"Error details: {str(e)}")
            continue

    # Insert any remaining hadiths
    if hadith_data:
        execute_batch(
            cursor,
            """
            INSERT INTO hadiths (chapter_id, chain_id, hadith_no, text_ar, text_en)
            VALUES (%s, %s, %s, %s, %s)
            ON CONFLICT (chapter_id, hadith_no) DO UPDATE
            SET chain_id = EXCLUDED.chain_id,
                text_ar = EXCLUDED.text_ar,
                text_en = EXCLUDED.text_en
            """,
            hadith_data
        )
        conn.commit()

    print("Data import completed successfully!")

except Exception as e:
    conn.rollback()
    print(f"An error occurred: {str(e)}")
    raise

finally:
    cursor.close()
    conn.close()

Processing dataset and inserting into database...


100%|██████████| 34441/34441 [15:14<00:00, 37.65it/s] 


Data import completed successfully!
