# Data Preprocessing

In [None]:
import os
import pymupdf
import psycopg2
from dotenv import load_dotenv

### Load environment variables

In [None]:
if not load_dotenv(".env"):
    print("An error has occurred. Make sure the file exists and is readable")
else:
    print("Loading successful")

Loading successful


## Database Functions

### Establish connection & Table Setup

In [None]:
def establish_connection():
    try:
        conn = psycopg2.connect(
            dbname= os.getenv("POSTGRES_DBNAME"),
            user= os.getenv("POSTGRES_USER"),
            password= os.getenv("POSTGRES_PASSWORD"),
            host= os.getenv("POSTGRES_HOST"),
            port= os.getenv("POSTGRES_PORT")
        )
        print("Successfully connected to PostgreSQL")
        cur = conn.cursor()

        # Checking if the table 'passages' exist
        print("Checking for 'passages' table")
        cur.execute("""
            SELECT EXISTS (
                SELECT FROM information_schema.tables
                WHERE table_name = 'passages'
            );
        """)

        table_exists = cur.fetchone()[0]
        if not table_exists:
            print("The table 'passages' does not exist. Creating one.")
            cur.execute("""
                CREATE TABLE passages (
                    passage_id SERIAL PRIMARY KEY,
                    title TEXT,
                    text TEXT
                )
            """)
            conn.commit()
            print("'passages' table created.")

        else:
            print("'passages' table already exists.")

        # Create 'processed_files' tracking table if it doesn't exist
        print("Checking for 'processed_files' tracking table")
        cur.execute("""
            SELECT EXISTS (
                SELECT FROM information_schema.tables
                WHERE table_name = 'processed_files'
            );
        """)
        processed_table_exists = cur.fetchone()[0]

        if not processed_table_exists:
            print("The table 'processed_files' does not exist. Creating one.")
            cur.execute("""
                CREATE TABLE processed_files (
                    id SERIAL PRIMARY KEY,
                    filename TEXT UNIQUE NOT NULL,
                    processed_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
                );
            """)
            conn.commit()
            print("'processed_files' table created.")
            
        else:
            print("'processed_files' table already exists.")

        print("Database setup completed")
        return conn, cur

    # Catches errors related to the database 
    except psycopg2.Error as e:
        print(f"An error has occurred during connecting to PostgreSQL or when creating tables: {e}")
        if cur:
            cur.close()
        if conn:
            conn.close()

        return None, None
    
    # Catches any other potential errors
    except Exception as e:
        print(f"An unexpected error has occurred: {e}")
        if cur:
            cur.close()
        if conn:
            conn.close()
        
        return None, None

### Retrieves processed filenames from the database

In [14]:
def get_processed_files(cur):
    processed = set()

    if not cur:
        print("Database cursor not available.")
        return processed    
    
    try:
        cur.execute("SELECT filename FROM processed_files")
        rows = cur.fetchall()
        processed = set(row[0] for row in rows)

        return processed
    
    except psycopg2.Error as e:
        print(f"An error has occurred when fetching processed files: {e}")
        
        return processed

### Add a filename to the processed_files tracking table

In [15]:
def mark_file_as_processed(conn, cur, filename):
    if not cur or not conn:
        print(f"Database connection or cursor is not available for {filename}")
        return False
    
    try:
        cur.execute("""
                    INSERT INTO processed_files (filename)
                    VALUES (%s) ON CONFLICT (filename) 
                    DO NOTHING""", (filename,)
                    ) 
        conn.commit()
        return True
    
    except psycopg2.Error as e:
        print(f"Error marking file {filename} as processed: {e}")
        conn.rollback()
        return False

### Scrap text from a single PDF file

In [None]:
def extract_text_from_PDF(file_path):
    filename = os.path.basename(file_path)

    try:
        with pymupdf.open(file_path) as pdf_document:
            full_text = " ".join(page.get_text() for page in pdf_document).strip()

        lines = [line.strip() for line in full_text.splitlines() if line.strip()]

        if not lines:
            print(f"An error has occurred. No text content extract from '{filename}'.")

            return filename.replace(".pdf", ""), ""

        title = lines[0]
        text = "".join(lines[1:]) if len(lines) > 1 else ""
    
        if not text.strip():
            print(f"Warning. No content extracted from the file {filename}")

        return title, text
    
    except Exception as e:
        print(f"An error has occurred from {file_path}: {e}")

        return None, None

### Process the texts before loading them into the database

In [17]:
def load_text_into_db(conn, cur, title, text):
    if not cur or not conn:
         print("Database connection or cursor is not available at the moment")
         return None
          
    try:
        # Insert new passages into the Database
        cur.execute("""
                    INSERT INTO passages (title, text)
                    VALUES (%s, %s)
                    RETURNING passage_id""", (title, text) 
                    )
        passage_id = cur.fetchone()[0]

        print(f"Inserted text: '{title}' (Temporary ID: {passage_id})")
        
        # Delete OLDER duplicates
        cur.execute("""
                    DELETE FROM passages 
                    WHERE passage_id < %s
                    AND title = %s
                    AND text = %s;
                    """, (passage_id, title, text))
        
        delete_count = cur.rowcount
        if delete_count > 0:
            print(f"Remove {delete_count} older duplicates for '{title}'.")

        conn.commit()

        print("Duplicates removed successfully!")
        
        return passage_id

    # Catches any database related errors
    except psycopg2.Error as e:
        print(f"An error has occurred during loading text for '{title}': {e}")
        conn.rollback()
        return None

    # Catches any other potential errors
    except Exception as e:
        print(f"An unexpected error has occurred: {e}")
        conn.rollback()
        return None

### Chunk text and update the database if there are new inserted files

In [18]:
def chunk_passage_and_update_db(conn, cur, passage_id_to_be_replaced, title, full_text):
    # Input validation
    if not passage_id_to_be_replaced or not full_text:
        print(f"Skipping chunking for passage ID: {passage_id_to_be_replaced} ('{title}')"
              f"due to missing ID or missing text.")
        return False
    
    if not cur or not conn:
        print("Database connection or curser currently unavailable for chunking")
        return False
    
    # Chunking parameters to limit each chunk under Pinecone's threasehold
    try:
        chunk_size = 1000
        overlap = 100
        chunks = []
        start = 0
        
        while start < len(full_text):
            end = start + chunk_size
            chunks.append(full_text[start:end])     # Slice the text and add it to the list
            next_start = start + chunk_size - overlap

            # Avoid loop if overlap is bigger or equal to chunk size, or the text is too short
            if next_start <= start:
                break
            start = next_start

        if not chunks:
            print(f"An error has occurred. No chunks generated for passage ID {passage_id_to_be_replaced} ('{title}'). "
                  f"Text might be too short or empty.")

            if full_text.strip():
                return False                        # Return False when there was text but no chunks

            else:
                print(f"Original text for passage ID {passage_id_to_be_replaced} ('{title}')"
                      f"was empty, skipping chunking or duplicate deletion process.")
                
                return True

        # Delete original passage, then insert chunks
        print(f"Replacing passage with ID {passage_id_to_be_replaced} with {len(chunks)} chunks.")
        cur.execute("""
                    DELETE FROM passages WHERE passage_id = %s
                    """, (passage_id_to_be_replaced,))
        
        if cur.rowcount == 0:
            print(f"Passage ID {passage_id_to_be_replaced} not found for deletion befor chunking. Proceed with chunk insertion.")

        else:
            print(f"Deleted original passage ID: {passage_id_to_be_replaced}") 

        # Insert new chunks
        inserted_chunks_id = []
        for i, chunk_text in enumerate(chunks):
            chunk_title = f"{title} (chunk {i + 1}/{len(chunks)})"
            cur.execute("""
                        INSERT INTO passages (title, text) 
                        VALUES (%s, %s)
                        RETURNING passage_id""", (chunk_title, chunk_text))

            new_id = cur.fetchone()[0]
            inserted_chunks_id.append(new_id) 
            
        conn.commit()
        print(f"Succesfully replaced passage ID {passage_id_to_be_replaced} with {len(chunks)}"
              f"chunks (New ID: {inserted_chunks_id}).")
        
        return True
    
    # Catches any potential database errors
    except psycopg2.Error as e:
        print(f"An error has occurred during chunking or updating for original passage ID {passage_id_to_be_replaced}"
              f"('{title}'): {e}")
        
        conn.rollback()
        return False
    
    # Catches any other potential errors
    except Exception as e:
        print(f"An unexpected error has occurred during chunking for original passage ID {passage_id_to_be_replaced}"
              f"('{title}'): {e}")
        
        conn.rollback()
        return False

### Scan folders for new PDFs and process them incrementallly

In [19]:
def process_data_incremental(conn, cur):
    if not cur or not conn:
        print("Database connection or cursor currently unavailable. Incremental process cancelled")
        return
    
    # Begin incremental process
    processed_filenames = get_processed_files(cur)
    print(f"\n Found {len(processed_filenames)} files previously processed")

    folder_path = os.getenv("PDF_FOLDER_PATH")
    if not os.path.isdir(folder_path):
        print(f"An error has occurred. Source folder not found at '{folder_path}'. Please check the path again.")
        return

    files_processed_count = 0
    files_skipped_count = 0
    files_error_count = 0

    try:
        pdf_files_in_folder = [f for f in os.listdir(folder_path) if f.lower().endswith('.pdf')]
        print(f"Found {len(pdf_files_in_folder)} PDF files in the target folder: {folder_path}")

    except Exception as e:
        print(f"An error has occurred during listing the files in folder '{folder_path}': {e}")
        return

    # Loop through each PDF inside the directory
    for filename in pdf_files_in_folder:
        file_path = os.path.join(folder_path, filename)

        if filename in processed_filenames:
            files_skipped_count += 1
            continue

        # Processing for a new file
        print(f"\n Processing new file: {filename}")

        title, text_content = extract_text_from_PDF(file_path)

        # Check if the text extraction works or did the function fail completely
        if title is None or text_content is None:
            print(f"Skipping file '{filename}' due to extraction error")
            files_error_count += 1
            continue

        # Check if the text is extracted but it is just empty space or not
        if not text_content.split():
            print(f"Skipping file '{filename}' because extracted text content is empty")
            files_error_count += 1
            continue

        print(f"Extracted text from '{filename}'. Title '{title[:50]}'")

        # Save the full text to the database temporarilly
        print(f"Load the full text for '{filename}' into the database temporarilly")
        temp_passage_id = load_text_into_db(conn, cur, title, text_content)

        # Verify if the full text is saved
        if temp_passage_id:
            print(f"Full text loaded (ID: {temp_passage_id}). Attempting to chunk")

            chunking_success = chunk_passage_and_update_db(conn, cur, temp_passage_id, title, text_content)

            if chunking_success:
                print(f"Chunking successful for '{filename}'. Proceed with the marking process")

                marking_success = mark_file_as_processed(conn, cur, filename)

                if marking_success:
                    files_processed_count += 1
                    print(f"Successfully processed and marked '{filename}'.")

                else:
                    print(f"Chunking is succesful for '{filename}' but marking it as processed FAILED. Manual checking is required.")
                    files_error_count += 1
            else:
                print(f"Chunking failed for '{filename}'. File not marked as processed.")
                files_error_count += 1
        
        else:
            print(f"Loading text into the Database failed for '{filename}'. Chunking process cannot be"
                "processed. File not marked as processed")

    print("\n Incremental processing completed")
    print(f"\n New files processed successfully: {files_processed_count}")
    print(f"\n Files skipped (already processed): {files_skipped_count}")
    print(f"\n Files with errors (skipped/failed): {files_error_count}")

## Main Execution Block

In [20]:
if __name__ == "__main__":
    conn_main, cur_main = establish_connection()

    # Proceed only if connection was successful
    if conn_main and cur_main:
        try:
            process_data_incremental(conn_main, cur_main)

        except Exception as e:
            print(f"An unexpected error occurred during the main script execution: {e}")
        
        finally: # Ensure closure happens even if errors occur
            print("\nClosing database connection")
            try:
                if cur_main: cur_main.close()
                if conn_main: conn_main.close()
                print("Database connection closed.")
            except psycopg2.Error as e:
                print(f"Error closing database connection: {e}")
    else:
        print("Script exiting: Database connection could not be established during setup.")

Successfully connected to PostgreSQL
Checking for 'passages' table
'passages' table already exists.
Checking for 'processed_files' tracking table
'processed_files' table already exists.
Database setup completed

 Found 19 files previously processed
Found 19 PDF files in the target folder: D:\IELTs Practice Web Platform\Training samples (IELTs)

 Incremental processing completed

 New files processed successfully: 0

 Files skipped (already processed): 19

 Files with errors (skipped/failed): 0

Closing database connection
Database connection closed.
