### RECURSIVE

1. Import Library

In [None]:
import fitz  # PyMuPDF untuk membaca file PDF
import os  # Modul os untuk operasi sistem file
import re  # Modul re untuk ekspresi reguler

2. Fungsi clean_text

In [None]:
def clean_text(text):
    """
    Membersihkan dan merapikan teks hasil ekstraksi.
    """
    text = re.sub(r'\s+', ' ', text)  # Mengganti spasi berlebih dengan satu spasi
    text = re.sub(r'(\w+)-\s*\n\s*(\w+)', r'\1\2', text)  # Menggabungkan kata yang terputus di akhir baris
    text = re.sub(r'\n+', ' ', text)  # Mengganti newline dengan spasi
    return text.strip()  # Menghapus spasi di awal dan akhir teks

3. Fungsi load_pdf_text

In [None]:
def load_pdf_text(pdf_path):
    """
    Load dan ekstrak teks dari PDF.
    """
    try:
        doc = fitz.open(pdf_path)  # Membuka file PDF
        full_text = ""  # Inisialisasi string untuk menyimpan teks
        
        for page_num in range(len(doc)):  # Iterasi melalui setiap halaman
            page = doc.load_page(page_num)  # Memuat halaman
            page_text = page.get_text("text")  # Mengambil teks dari halaman
            page_text = clean_text(page_text)  # Membersihkan teks
            full_text += page_text + " "  # Menambahkan teks halaman ke full_text
        
        return clean_text(full_text)  # Mengembalikan teks yang telah dibersihkan
    except Exception as e:
        print(f"Terjadi kesalahan: {e}")  # Menangani kesalahan saat membuka file
        return ""  # Mengembalikan string kosong jika terjadi kesalahan

4. Fungsi recursive_chunk

In [None]:
def recursive_chunk(text, chunk_size, separators=None):
    """
    Membagi teks secara rekursif menjadi chunk dengan panjang maksimum menggunakan separator yang ditentukan.
    """
    if not text.strip():  # Jika teks kosong, kembalikan daftar kosong
        return []

    if separators is None:  # Jika tidak ada separator yang ditentukan, gunakan titik sebagai default
        separators = ["."]
    
    for separator in separators:  # Iterasi melalui setiap separator
        if separator in text[:chunk_size]:  # Jika separator ditemukan dalam batas chunk_size
            split_index = text[:chunk_size].rfind(separator) + len(separator)  # Temukan indeks pemisah terakhir
            chunk = text[:split_index]  # Ambil chunk
            remaining_text = text[split_index:].strip()  # Ambil sisa teks
            return [chunk] + recursive_chunk(remaining_text, chunk_size, separators)  # Kembalikan chunk dan sisa teks yang dipecah

    chunk = text[:chunk_size]  # Jika tidak ada separator ditemukan, ambil chunk berdasarkan ukuran
    remaining_text = text[chunk_size:].strip()  # Ambil sisa teks
    return [chunk] + recursive_chunk(remaining_text, chunk_size, separators)  # Kembalikan chunk dan sisa teks yang dipecah

5. Fungsi save_chunks_to_file

In [None]:
def save_chunks_to_file(chunks, output_file):
    """
    Menyimpan semua chunk ke dalam satu file.
    """
    with open(output_file, "w", encoding="utf-8") as f:  # Membuka file untuk menulis
        for idx, chunk in enumerate(chunks, 1):  # Iterasi melalui setiap chunk
            f.write(f"Chunk {idx:03d}:\n")  # Menulis header untuk chunk
            f.write(chunk)  # Menulis isi chunk
            f.write("\n\n")  # Menambahkan newline setelah setiap chunk
    
    print(f"Semua chunks telah disimpan ke dalam file: {output_file}")  # Menampilkan pesan bahwa penyimpanan selesai

6. Fungsi main

In [None]:
def main():
    # Konfigurasi
    pdf_path = "./data/tko.pdf"  # Jalur ke file PDF yang akan diproses
    output_file = "./output_chunks/output_recursive.txt"  # File untuk menyimpan hasil chunking
    chunk_size = 1000  # Ukuran maksimum setiap chunk

    # Load PDF dan ekstrak teks
    print("Mengekstrak teks dari PDF...")
    full_text = load_pdf_text(pdf_path)  # Memanggil fungsi untuk mengekstrak teks dari PDF

    if not full_text:  # Jika tidak ada teks yang diekstrak
        print("Tidak ada teks yang bisa diekstrak. Program dihentikan.")  # Menampilkan pesan kesalahan
        return

    # Chunk teks secara rekursif
    print("Membagi teks menjadi chunks secara rekursif...")
    chunks = recursive_chunk(full_text, chunk_size) 

    print(f"Selesai membagi menjadi {len(chunks)} chunks.")  # Menampilkan jumlah chunk yang dihasilkan

    # Simpan semua chunk ke satu file 
    save_chunks_to_file(chunks, output_file) 

7. Eksekusi Program

In [None]:
if __name__ == "__main__":
    main()  # Menjalankan fungsi main jika file ini dieksekusi sebagai skrip utama

8. Keseluruhan Kode Program 

In [1]:
import fitz  # PyMuPDF untuk membaca file PDF
import os  # Modul os untuk operasi sistem file
import re  # Modul re untuk ekspresi reguler

def clean_text(text):
    """
    Membersihkan dan merapikan teks hasil ekstraksi.
    """
    text = re.sub(r'\s+', ' ', text)  # Mengganti spasi berlebih dengan satu spasi
    text = re.sub(r'(\w+)-\s*\n\s*(\w+)', r'\1\2', text)  # Menggabungkan kata yang terputus di akhir baris
    text = re.sub(r'\n+', ' ', text)  # Mengganti newline dengan spasi
    return text.strip()  # Menghapus spasi di awal dan akhir teks

def load_pdf_text(pdf_path):
    """
    Load dan ekstrak teks dari PDF.
    """
    try:
        doc = fitz.open(pdf_path)  # Membuka file PDF
        full_text = ""  # Inisialisasi string untuk menyimpan teks
        
        for page_num in range(len(doc)):  # Iterasi melalui setiap halaman
            page = doc.load_page(page_num)  # Memuat halaman
            page_text = page.get_text("text")  # Mengambil teks dari halaman
            page_text = clean_text(page_text)  # Membersihkan teks
            full_text += page_text + " "  # Menambahkan teks halaman ke full_text
        
        return clean_text(full_text)  # Mengembalikan teks yang telah dibersihkan
    except Exception as e:
        print(f"Terjadi kesalahan: {e}")  # Menangani kesalahan saat membuka file
        return ""  # Mengembalikan string kosong jika terjadi kesalahan

def recursive_chunk(text, chunk_size, separators=None):
    """
    Membagi teks secara rekursif menjadi chunk dengan panjang maksimum menggunakan separator yang ditentukan.
    """
    if not text.strip():  # Jika teks kosong, kembalikan daftar kosong
        return []

    if separators is None:  # Jika tidak ada separator yang ditentukan, gunakan titik sebagai default
        separators = ["."]
    
    for separator in separators:  # Iterasi melalui setiap separator
        if separator in text[:chunk_size]:  # Jika separator ditemukan dalam batas chunk_size
            split_index = text[:chunk_size].rfind(separator) + len(separator)  # Temukan indeks pemisah terakhir
            chunk = text[:split_index]  # Ambil chunk
            remaining_text = text[split_index:].strip()  # Ambil sisa teks
            return [chunk] + recursive_chunk(remaining_text, chunk_size, separators)  # Kembalikan chunk dan sisa teks yang dipecah

    chunk = text[:chunk_size]  # Jika tidak ada separator ditemukan, ambil chunk berdasarkan ukuran
    remaining_text = text[chunk_size:].strip()  # Ambil sisa teks
    return [chunk] + recursive_chunk(remaining_text, chunk_size, separators)  # Kembalikan chunk dan sisa teks yang dipecah

def save_chunks_to_file(chunks, output_file):
    """
    Menyimpan semua chunk ke dalam satu file.
    """
    with open(output_file, "w", encoding="utf-8") as f:  # Membuka file untuk menulis
        for idx, chunk in enumerate(chunks, 1):  # Iterasi melalui setiap chunk
            f.write(f"Chunk {idx:03d}:\n")  # Menulis header untuk chunk
            f.write(chunk)  # Menulis isi chunk
            f.write("\n\n")  # Menambahkan newline setelah setiap chunk
    
    print(f"Semua chunks telah disimpan ke dalam file: {output_file}")  # Menampilkan pesan bahwa penyimpanan selesai

def main():
    # Konfigurasi
    pdf_path = "./data/tko.pdf"  # Jalur ke file PDF yang akan diproses
    output_file = "./output_chunks/output_recursive.txt"  # File untuk menyimpan hasil chunking
    chunk_size = 1000  # Ukuran maksimum setiap chunk

    # Load PDF dan ekstrak teks
    print("Mengekstrak teks dari PDF...")
    full_text = load_pdf_text(pdf_path)  # Memanggil fungsi untuk mengekstrak teks dari PDF

    if not full_text:  # Jika tidak ada teks yang diekstrak
        print("Tidak ada teks yang bisa diekstrak. Program dihentikan.")  # Menampilkan pesan kesalahan
        return

    # Chunk teks secara rekursif
    print("Membagi teks menjadi chunks secara rekursif...")
    chunks = recursive_chunk(full_text, chunk_size) 

    print(f"Selesai membagi menjadi {len(chunks)} chunks.")  # Menampilkan jumlah chunk yang dihasilkan

    # Simpan semua chunk ke satu file 
    save_chunks_to_file(chunks, output_file) 

if __name__ == "__main__":
    main()  # Menjalankan fungsi main jika file ini dieksekusi sebagai skrip utama

Mengekstrak teks dari PDF...
Membagi teks menjadi chunks secara rekursif...
Selesai membagi menjadi 11 chunks.
Semua chunks telah disimpan ke dalam file: ./output_chunks/output_recursive.txt


### OVERLAP

1. Import Library

In [None]:
import os  # Mengimpor modul os untuk operasi sistem file
import fitz  # Mengimpor fitz (PyMuPDF) untuk membaca file PDF
import nltk  # Mengimpor NLTK untuk pemrosesan bahasa alami
import re  # Mengimpor modul re untuk ekspresi reguler
import textwrap  # Mengimpor modul textwrap untuk membungkus teks
from langchain.text_splitter import RecursiveCharacterTextSplitter  # Mengimpor RecursiveCharacterTextSplitter untuk membagi teks
from nltk.tokenize import sent_tokenize  # Mengimpor fungsi untuk tokenisasi kalimat

2. Kelas PDFChunker

    Berisikan fungsi
    - init (Konfigurasi paramter dan text_splitter)
    - extract_text_from_pdf
    - clean_text
    - create_improved_chunks
    - wrap_text
    - save_chunks_to_file
    - process_pdf
    - analyze_chunks

In [None]:
class PDFChunker:
    def __init__(self, chunk_size=1000, chunk_overlap=200, line_width=80):
        # Inisialisasi parameter untuk chunking
        self.chunk_size = chunk_size  # Ukuran maksimum setiap chunk
        self.chunk_overlap = chunk_overlap  # Jumlah karakter yang tumpang tindih antara chunk
        self.line_width = line_width  # Lebar baris untuk membungkus teks
        # Inisialisasi text splitter dengan parameter yang ditentukan
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""]
        )

    def extract_text_from_pdf(self, pdf_path):
        """Extract text from PDF file with better formatting."""
        try:
            doc = fitz.open(pdf_path)  # Membuka file PDF
            text_blocks = []  # Inisialisasi daftar untuk menyimpan blok teks
            
            for page in doc:  # Iterasi melalui setiap halaman
                blocks = page.get_text("blocks")  # Mengambil blok teks dari halaman
                for block in blocks:  # Iterasi melalui setiap blok
                    clean_text = block[4].strip()  # Mengambil teks dan menghapus spasi di awal/akhir
                    if clean_text:  # Jika teks tidak kosong
                        text_blocks.append(clean_text)  # Tambahkan blok teks ke daftar
            
            text = " ".join(text_blocks)  # Gabungkan semua blok teks menjadi satu string
            return self.clean_text(text)  # Mengembalikan teks yang telah dibersihkan
        except Exception as e:
            print(f"Error reading PDF: {e}")  # Menangani kesalahan saat membaca PDF
            return None
        finally:
            if 'doc' in locals():  # Pastikan dokumen ditutup jika berhasil dibuka
                doc.close()

    def clean_text(self, text):
        """Clean and normalize text while preserving structure."""
        if not text:  # Jika teks kosong
            return ""
        
        # Menghapus spasi berlebih dan normalisasi tanda baca
        text = re.sub(r'\s+', ' ', text)  # Mengganti spasi berlebih dengan satu spasi
        text = re.sub(r'([.!?])\s*', r'\1 ', text)  # Menambahkan spasi setelah tanda baca
        text = re.sub(r'\s+([.!?])', r'\1', text)  # Menghapus spasi sebelum tanda baca
        text = re.sub(r'\n\s*\n', '\n', text)  # Menghapus newline berlebih
        
        return text.strip()  # Mengembalikan teks yang telah dibersihkan

    def create_improved_chunks(self, text):
        """Create chunks with improved sentence handling."""
        if not text:  # Jika teks kosong
            return []

        sentences = sent_tokenize(text)  # Tokenisasi teks menjadi kalimat
        chunks = []  # Inisialisasi daftar untuk menyimpan chunk
        current_chunk = []  # Inisialisasi daftar untuk menyimpan kalimat dalam chunk
        current_length = 0  # Inisialisasi panjang chunk saat ini

        for sentence in sentences:  # Iterasi melalui setiap kalimat
            sentence = sentence.strip()  # Menghapus spasi di awal/akhir kalimat
            sentence_length = len(sentence)  # Menghitung panjang kalimat

            if not current_chunk:  # Jika chunk saat ini kosong
                current_chunk.append(sentence)  # Tambahkan kalimat ke chunk
                current_length = sentence_length  # Perbarui panjang chunk
                continue

            # Jika panjang chunk ditambah kalimat tidak melebihi batas
            if current_length + len(" ") + sentence_length <= self.chunk_size:
                current_chunk.append(sentence)  # Tambahkan kalimat ke chunk
                current_length += len(" ") + sentence_length  # Perbarui panjang chunk
            else:
                chunks.append(" ".join(current_chunk))  # Tambahkan chunk ke daftar
                current_chunk = [sentence]  # Mulai chunk baru dengan kalimat saat ini
                current_length = sentence_length  # Perbarui panjang chunk

        if current_chunk:  # Jika ada chunk yang tersisa
            chunks.append(" ".join(current_chunk))  # Tambahkan chunk terakhir ke daftar

        final_chunks = []  # Inisialisasi daftar untuk menyimpan chunk akhir
        for i in range(len(chunks)):  # Iterasi melalui setiap chunk
            if i > 0:  # Jika bukan chunk pertama
                prev_chunk_sentences = sent_tokenize(chunks[i-1])  # Tokenisasi chunk sebelumnya
                overlap_sentences = prev_chunk_sentences[-2:] if len(prev_chunk_sentences) > 2 else prev_chunk_sentences  # Ambil dua kalimat terakhir untuk tumpang tindih
                current_chunk = " ".join(overlap_sentences) + " " + chunks[i]  # Gabungkan kalimat tumpang tindih dengan chunk saat ini
                final_chunks.append(current_chunk)  # Tambahkan chunk yang telah digabungkan ke daftar
            else:
                final_chunks.append(chunks[i])  # Tambahkan chunk pertama ke daftar

        return final_chunks  # Mengembalikan daftar chunk akhir

    def wrap_text(self, text):
        """Wrap text to specified line width while preserving paragraphs."""
        paragraphs = text.split('\n\n')  # Memisahkan teks menjadi paragraf
        
        wrapped_paragraphs = []  # Inisialisasi daftar untuk menyimpan paragraf yang dibungkus
        for paragraph in paragraphs:  # Iterasi melalui setiap paragraf
            wrapped = textwrap.fill(paragraph.strip(), width=self.line_width)  # Membungkus paragraf dengan lebar yang ditentukan
            wrapped_paragraphs.append(wrapped)  # Tambahkan paragraf yang dibungkus ke daftar
        
        return '\n\n'.join(wrapped_paragraphs)  # Menggabungkan paragraf yang dibungkus dengan newline ganda

    def save_chunks_to_file(self, chunks, output_folder="chunks_output"):
        """Save chunks to a single file with proper text wrapping."""
        if not os.path.exists(output_folder):  # Jika folder output tidak ada
            os.makedirs(output_folder)  # Buat folder output

        output_file = os.path.join(output_folder, "output_overlap.txt")  # Tentukan nama file output
        
        try:
            with open(output_file, "w", encoding="utf-8") as file:  # Membuka file untuk menulis
                for i, chunk in enumerate(chunks, 1):  # Iterasi melalui setiap chunk
                    header = f"Chunk {i}"  # Membuat header untuk chunk
                    file.write(f"{header}\n")  # Menulis header ke file
                    file.write("="* len(header) + "\n\n")  # Menulis garis pemisah
                    
                    wrapped_content = self.wrap_text(chunk)  # Membungkus konten chunk
                    file.write(wrapped_content)  # Menulis konten chunk yang dibungkus ke file
                    file.write("\n\n\n")  # Menambahkan spasi ekstra antara chunk

            print(f"✅ {len(chunks)} chunks successfully saved to '{output_file}'")  # Menampilkan pesan sukses
        except Exception as e:
            print(f"Error saving chunks: {e}")  # Menangani kesalahan saat menyimpan chunk

    def process_pdf(self, pdf_path):
        """Process PDF and create better chunks."""
        text = self.extract_text_from_pdf(pdf_path)  # Mengambil teks dari PDF
        if not text:  # Jika tidak ada teks
            return []
        
        return self.create_improved_chunks(text)  # Mengembalikan chunk yang telah dibuat

    def analyze_chunks(self, chunks):
        """Analyze chunks with improved metrics."""
        if not chunks:  # Jika tidak ada chunk
            return {
                "total_chunks": 0,
                "average_size": 0,
                "size_stats": {},
                "sentence_stats": {}
            }

        chunk_sizes = [len(chunk) for chunk in chunks]  # Menghitung ukuran setiap chunk
        sentences_per_chunk = [len(sent_tokenize(chunk)) for chunk in chunks]  # Menghitung jumlah kalimat per chunk

        analysis = {
            "total_chunks": len(chunks),  # Total chunk
            "average_size": sum(chunk_sizes) / len(chunks),  # Ukuran rata-rata chunk
            "size_stats": {
                "min": min(chunk_sizes),  # Ukuran chunk terkecil
                "max": max(chunk_sizes),  # Ukuran chunk terbesar
                "avg": sum(chunk_sizes) / len(chunks)  # Ukuran rata-rata chunk
            },
            "sentence_stats": {
                "min_sentences": min(sentences_per_chunk),  # Jumlah kalimat minimum per chunk
                "max_sentences": max(sentences_per_chunk),  # Jumlah kalimat maksimum per chunk
                "avg_sentences": sum(sentences_per_chunk) / len(sentences_per_chunk)  # Jumlah kalimat rata-rata per chunk
            }
        }
        return analysis  # Mengembalikan hasil analisis chunk

3. Fungsi main

In [None]:
def main():
    # Inisialisasi chunker dengan parameter untuk membungkus teks
    chunker = PDFChunker(chunk_size=1000, chunk_overlap=200, line_width=80)
    
    # Konfigurasi
    pdf_path = "./data/tko.pdf"  # Jalur ke file PDF yang akan diproses
    output_folder = "./output_chunks/"  # Folder untuk menyimpan hasil chunking
    
    # Proses PDF
    chunks = chunker.process_pdf(pdf_path)  # Memanggil fungsi untuk memproses PDF
    
    if chunks:  # Jika ada chunk yang dihasilkan
        # Simpan hasil
        chunker.save_chunks_to_file(chunks, output_folder)  # Memanggil fungsi untuk menyimpan chunk ke file
        
        # Analisis dan tampilkan hasil
        analysis = chunker.analyze_chunks(chunks)  # Memanggil fungsi untuk menganalisis chunk
        print("\nChunking Analysis:")  # Menampilkan analisis chunk
        print(f"Total chunks: {analysis['total_chunks']}")  # Menampilkan total chunk
        print(f"Average chunk size: {analysis['size_stats']['avg']:.0f} characters")  # Menampilkan ukuran rata-rata chunk
        print(f"Min/Max chunk size: {analysis['size_stats']['min']}/{analysis['size_stats']['max']} characters")  # Menampilkan ukuran chunk minimum dan maksimum
        print(f"\nSentences per chunk:")  # Menampilkan jumlah kalimat per chunk
        print(f"Min: {analysis['sentence_stats']['min_sentences']:.0f}")  # Menampilkan jumlah kalimat minimum
        print(f"Max: {analysis['sentence_stats']['max_sentences']:.0f}")  # Menampilkan jumlah kalimat maksimum
        print(f"Average: {analysis['sentence_stats']['avg_sentences']:.1f}")  # Menampilkan jumlah kalimat rata-rata

4. Eksekusi program

In [None]:
if __name__ == "__main__":
    main()  # Menjalankan fungsi main jika file ini dieksekusi sebagai skrip utama

5. Keseluruhan kode program

In [2]:
import os  # Mengimpor modul os untuk operasi sistem file
import fitz  # Mengimpor fitz (PyMuPDF) untuk membaca file PDF
import nltk  # Mengimpor NLTK untuk pemrosesan bahasa alami
import re  # Mengimpor modul re untuk ekspresi reguler
import textwrap  # Mengimpor modul textwrap untuk membungkus teks
from langchain.text_splitter import RecursiveCharacterTextSplitter  # Mengimpor RecursiveCharacterTextSplitter untuk membagi teks
from nltk.tokenize import sent_tokenize  # Mengimpor fungsi untuk tokenisasi kalimat

class PDFChunker:
    def __init__(self, chunk_size=1000, chunk_overlap=200, line_width=80):
        # Inisialisasi parameter untuk chunking
        self.chunk_size = chunk_size  # Ukuran maksimum setiap chunk
        self.chunk_overlap = chunk_overlap  # Jumlah karakter yang tumpang tindih antara chunk
        self.line_width = line_width  # Lebar baris untuk membungkus teks
        # Inisialisasi text splitter dengan parameter yang ditentukan
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""]
        )

    def extract_text_from_pdf(self, pdf_path):
        """Extract text from PDF file with better formatting."""
        try:
            doc = fitz.open(pdf_path)  # Membuka file PDF
            text_blocks = []  # Inisialisasi daftar untuk menyimpan blok teks
            
            for page in doc:  # Iterasi melalui setiap halaman
                blocks = page.get_text("blocks")  # Mengambil blok teks dari halaman
                for block in blocks:  # Iterasi melalui setiap blok
                    clean_text = block[4].strip()  # Mengambil teks dan menghapus spasi di awal/akhir
                    if clean_text:  # Jika teks tidak kosong
                        text_blocks.append(clean_text)  # Tambahkan blok teks ke daftar
            
            text = " ".join(text_blocks)  # Gabungkan semua blok teks menjadi satu string
            return self.clean_text(text)  # Mengembalikan teks yang telah dibersihkan
        except Exception as e:
            print(f"Error reading PDF: {e}")  # Menangani kesalahan saat membaca PDF
            return None
        finally:
            if 'doc' in locals():  # Pastikan dokumen ditutup jika berhasil dibuka
                doc.close()

    def clean_text(self, text):
        """Clean and normalize text while preserving structure."""
        if not text:  # Jika teks kosong
            return ""
        
        # Menghapus spasi berlebih dan normalisasi tanda baca
        text = re.sub(r'\s+', ' ', text)  # Mengganti spasi berlebih dengan satu spasi
        text = re.sub(r'([.!?])\s*', r'\1 ', text)  # Menambahkan spasi setelah tanda baca
        text = re.sub(r'\s+([.!?])', r'\1', text)  # Menghapus spasi sebelum tanda baca
        text = re.sub(r'\n\s*\n', '\n', text)  # Menghapus newline berlebih
        
        return text.strip()  # Mengembalikan teks yang telah dibersihkan

    def create_improved_chunks(self, text):
        """Create chunks with improved sentence handling."""
        if not text:  # Jika teks kosong
            return []

        sentences = sent_tokenize(text)  # Tokenisasi teks menjadi kalimat
        chunks = []  # Inisialisasi daftar untuk menyimpan chunk
        current_chunk = []  # Inisialisasi daftar untuk menyimpan kalimat dalam chunk
        current_length = 0  # Inisialisasi panjang chunk saat ini

        for sentence in sentences:  # Iterasi melalui setiap kalimat
            sentence = sentence.strip()  # Menghapus spasi di awal/akhir kalimat
            sentence_length = len(sentence)  # Menghitung panjang kalimat

            if not current_chunk:  # Jika chunk saat ini kosong
                current_chunk.append(sentence)  # Tambahkan kalimat ke chunk
                current_length = sentence_length  # Perbarui panjang chunk
                continue

            # Jika panjang chunk ditambah kalimat tidak melebihi batas
            if current_length + len(" ") + sentence_length <= self.chunk_size:
                current_chunk.append(sentence)  # Tambahkan kalimat ke chunk
                current_length += len(" ") + sentence_length  # Perbarui panjang chunk
            else:
                chunks.append(" ".join(current_chunk))  # Tambahkan chunk ke daftar
                current_chunk = [sentence]  # Mulai chunk baru dengan kalimat saat ini
                current_length = sentence_length  # Perbarui panjang chunk

        if current_chunk:  # Jika ada chunk yang tersisa
            chunks.append(" ".join(current_chunk))  # Tambahkan chunk terakhir ke daftar

        final_chunks = []  # Inisialisasi daftar untuk menyimpan chunk akhir
        for i in range(len(chunks)):  # Iterasi melalui setiap chunk
            if i > 0:  # Jika bukan chunk pertama
                prev_chunk_sentences = sent_tokenize(chunks[i-1])  # Tokenisasi chunk sebelumnya
                overlap_sentences = prev_chunk_sentences[-2:] if len(prev_chunk_sentences) > 2 else prev_chunk_sentences  # Ambil dua kalimat terakhir untuk tumpang tindih
                current_chunk = " ".join(overlap_sentences) + " " + chunks[i]  # Gabungkan kalimat tumpang tindih dengan chunk saat ini
                final_chunks.append(current_chunk)  # Tambahkan chunk yang telah digabungkan ke daftar
            else:
                final_chunks.append(chunks[i])  # Tambahkan chunk pertama ke daftar

        return final_chunks  # Mengembalikan daftar chunk akhir

    def wrap_text(self, text):
        """Wrap text to specified line width while preserving paragraphs."""
        paragraphs = text.split('\n\n')  # Memisahkan teks menjadi paragraf
        
        wrapped_paragraphs = []  # Inisialisasi daftar untuk menyimpan paragraf yang dibungkus
        for paragraph in paragraphs:  # Iterasi melalui setiap paragraf
            wrapped = textwrap.fill(paragraph.strip(), width=self.line_width)  # Membungkus paragraf dengan lebar yang ditentukan
            wrapped_paragraphs.append(wrapped)  # Tambahkan paragraf yang dibungkus ke daftar
        
        return '\n\n'.join(wrapped_paragraphs)  # Menggabungkan paragraf yang dibungkus dengan newline ganda

    def save_chunks_to_file(self, chunks, output_folder="chunks_output"):
        """Save chunks to a single file with proper text wrapping."""
        if not os.path.exists(output_folder):  # Jika folder output tidak ada
            os.makedirs(output_folder)  # Buat folder output

        output_file = os.path.join(output_folder, "output_overlap.txt")  # Tentukan nama file output
        
        try:
            with open(output_file, "w", encoding="utf-8") as file:  # Membuka file untuk menulis
                for i, chunk in enumerate(chunks, 1):  # Iterasi melalui setiap chunk
                    header = f"Chunk {i}"  # Membuat header untuk chunk
                    file.write(f"{header}\n")  # Menulis header ke file
                    file.write("="* len(header) + "\n\n")  # Menulis garis pemisah
                    
                    wrapped_content = self.wrap_text(chunk)  # Membungkus konten chunk
                    file.write(wrapped_content)  # Menulis konten chunk yang dibungkus ke file
                    file.write("\n\n\n")  # Menambahkan spasi ekstra antara chunk

            print(f"✅ {len(chunks)} chunks successfully saved to '{output_file}'")  # Menampilkan pesan sukses
        except Exception as e:
            print(f"Error saving chunks: {e}")  # Menangani kesalahan saat menyimpan chunk

    def process_pdf(self, pdf_path):
        """Process PDF and create better chunks."""
        text = self.extract_text_from_pdf(pdf_path)  # Mengambil teks dari PDF
        if not text:  # Jika tidak ada teks
            return []
        
        return self.create_improved_chunks(text)  # Mengembalikan chunk yang telah dibuat

    def analyze_chunks(self, chunks):
        """Analyze chunks with improved metrics."""
        if not chunks:  # Jika tidak ada chunk
            return {
                "total_chunks": 0,
                "average_size": 0,
                "size_stats": {},
                "sentence_stats": {}
            }

        chunk_sizes = [len(chunk) for chunk in chunks]  # Menghitung ukuran setiap chunk
        sentences_per_chunk = [len(sent_tokenize(chunk)) for chunk in chunks]  # Menghitung jumlah kalimat per chunk

        analysis = {
            "total_chunks": len(chunks),  # Total chunk
            "average_size": sum(chunk_sizes) / len(chunks),  # Ukuran rata-rata chunk
            "size_stats": {
                "min": min(chunk_sizes),  # Ukuran chunk terkecil
                "max": max(chunk_sizes),  # Ukuran chunk terbesar
                "avg": sum(chunk_sizes) / len(chunks)  # Ukuran rata-rata chunk
            },
            "sentence_stats": {
                "min_sentences": min(sentences_per_chunk),  # Jumlah kalimat minimum per chunk
                "max_sentences": max(sentences_per_chunk),  # Jumlah kalimat maksimum per chunk
                "avg_sentences": sum(sentences_per_chunk) / len(sentences_per_chunk)  # Jumlah kalimat rata-rata per chunk
            }
        }
        return analysis  # Mengembalikan hasil analisis chunk

def main():
    # Inisialisasi chunker dengan parameter untuk membungkus teks
    chunker = PDFChunker(chunk_size=1000, chunk_overlap=200, line_width=80)
    
    # Konfigurasi
    pdf_path = "./data/tko.pdf"  # Jalur ke file PDF yang akan diproses
    output_folder = "./output_chunks/"  # Folder untuk menyimpan hasil chunking
    
    # Proses PDF
    chunks = chunker.process_pdf(pdf_path)  # Memanggil fungsi untuk memproses PDF
    
    if chunks:  # Jika ada chunk yang dihasilkan
        # Simpan hasil
        chunker.save_chunks_to_file(chunks, output_folder)  # Memanggil fungsi untuk menyimpan chunk ke file
        
        # Analisis dan tampilkan hasil
        analysis = chunker.analyze_chunks(chunks)  # Memanggil fungsi untuk menganalisis chunk
        print("\nChunking Analysis:")  # Menampilkan analisis chunk
        print(f"Total chunks: {analysis['total_chunks']}")  # Menampilkan total chunk
        print(f"Average chunk size: {analysis['size_stats']['avg']:.0f} characters")  # Menampilkan ukuran rata-rata chunk
        print(f"Min/Max chunk size: {analysis['size_stats']['min']}/{analysis['size_stats']['max']} characters")  # Menampilkan ukuran chunk minimum dan maksimum
        print(f"\nSentences per chunk:")  # Menampilkan jumlah kalimat per chunk
        print(f"Min: {analysis['sentence_stats']['min_sentences']:.0f}")  # Menampilkan jumlah kalimat minimum
        print(f"Max: {analysis['sentence_stats']['max_sentences']:.0f}")  # Menampilkan jumlah kalimat maksimum
        print(f"Average: {analysis['sentence_stats']['avg_sentences']:.1f}")  # Menampilkan jumlah kalimat rata-rata

if __name__ == "__main__":
    main()  # Menjalankan fungsi main jika file ini dieksekusi sebagai skrip utama

✅ 10 chunks successfully saved to './output_chunks/output_overlap.txt'

Chunking Analysis:
Total chunks: 10
Average chunk size: 1087 characters
Min/Max chunk size: 754/1444 characters

Sentences per chunk:
Min: 7
Max: 25
Average: 12.2


### TOKEN BASE

1. Import Library

In [None]:
import os  # Mengimpor modul os untuk operasi sistem file
import tiktoken  # Mengimpor tiktoken untuk pengkodean token
import fitz  # Mengimpor fitz (PyMuPDF) untuk membaca file PDF
from datetime import datetime  # Mengimpor datetime untuk menangani tanggal dan waktu
from typing import List, Tuple  # Mengimpor tipe data List dan Tuple untuk anotasi tipe
import re  # Mengimpor modul re untuk ekspresi reguler
import logging  # Mengimpor modul logging untuk mencatat informasi
import gc  # Mengimpor modul gc untuk pengelolaan memori
import nltk  # Mengimpor NLTK untuk pemrosesan bahasa alami
nltk.download('punkt')  # Mengunduh model tokenisasi kalimat dari NLTK
from nltk.tokenize import sent_tokenize  # Mengimpor fungsi untuk tokenisasi kalimat

2. Setup logging

In [None]:
logging.basicConfig(
    filename='pdf_processing.log',  # Menentukan nama file log
    level=logging.INFO,  # Menentukan level logging
    format='%(asctime)s - %(levelname)s - %(message)s'  # Format pesan log
)

3. Kelas SentenceChunkProcessor

    Berisikan fungsi
    - init (Konfigurasi dan inisialisasi parameter)
    - read_pdf_and_clean
    - restore_numbering
    - chunk_text_by_sentences
    - save_chunks

In [None]:
class SentenceChunkProcessor:
    def __init__(self, chunk_size=500):
        # Inisialisasi parameter untuk chunking
        self.chunk_size = chunk_size  # Ukuran maksimum setiap chunk
        self.encoder = tiktoken.get_encoding("cl100k_base")  # Menginisialisasi encoder token

    def read_pdf_and_clean(self, file_path: str) -> str:
        """Extract text from PDF file with better formatting."""
        logging.info(f"Started processing PDF: {file_path}")  # Mencatat bahwa pemrosesan PDF dimulai
        doc = fitz.open(file_path)  # Membuka file PDF
        text = " ".join([page.get_text("text") for page in doc])  # Mengambil teks dari setiap halaman
        text = re.sub(r'\s+', ' ', text).strip()  # Menghapus spasi berlebih dan menghapus spasi di awal/akhir

        # Menghindari pemisahan angka yang salah (4. 3.)
        text = re.sub(r'(\d+)\.\s(?=\d+)', r'\1|', text)  # Mengganti pemisahan angka dengan karakter '|'

        logging.info(f"Successfully read and cleaned PDF: {file_path}")  # Mencatat bahwa PDF berhasil dibaca dan dibersihkan
        return text  # Mengembalikan teks yang telah dibersihkan

    def restore_numbering(self, text: str) -> str:
        """Mengembalikan angka yang sebelumnya diganti (4|3 -> 4. 3.)"""
        return text.replace('|', '. ')  # Mengganti karakter '|' kembali menjadi '. '

    def chunk_text_by_sentences(self, text: str) -> List[Tuple[str, int]]:
        """Create chunks with improved sentence handling."""
        text = self.restore_numbering(text)  # Kembalikan format angka
        sentences = sent_tokenize(text)  # Tokenisasi teks menjadi kalimat
        chunks = []  # Inisialisasi daftar untuk menyimpan chunk
        current_tokens = []  # Inisialisasi daftar untuk menyimpan token saat ini
        current_text = ""  # Inisialisasi string untuk menyimpan teks saat ini

        for sentence in sentences:  # Iterasi melalui setiap kalimat
            sentence = sentence.strip()  # Menghapus spasi di awal/akhir kalimat
            sentence_tokens = self.encoder.encode(sentence)  # Mengkodekan kalimat menjadi token
            
            # Jika penambahan kalimat melebihi ukuran chunk
            if len(current_tokens) + len(sentence_tokens) > self.chunk_size:
                if current_tokens:  # Jika ada token saat ini
                    chunks.append((self.encoder.decode(current_tokens), len(current_tokens)))  # Tambahkan chunk ke daftar
                    current_tokens = []  # Reset token saat ini tanpa overlap
                    current_text = ""  # Reset teks saat ini
            
            current_tokens.extend(sentence_tokens)  # Tambahkan token kalimat ke token saat ini
            current_text += sentence  # Tambahkan kalimat ke teks saat ini
            
            gc.collect()  # Mengumpulkan sampah untuk mengelola memori
        
        if current_tokens:  # Jika ada token yang tersisa
            chunks.append((self.encoder.decode(current_tokens), len(current_tokens)))  # Tambahkan chunk terakhir ke daftar

        logging.info(f"Processed and chunked text into {len(chunks)} chunks")  # Mencatat jumlah chunk yang dihasilkan
        return chunks  # Mengembalikan daftar chunk

    def save_chunks(self, content_chunks: List[Tuple[str, int]], output_path: str) -> None:
        """Save chunks to a single file with proper text wrapping."""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")  # Mendapatkan timestamp untuk penamaan file
        output_path = f"{os.path.splitext(output_path)[0]}_{timestamp}.txt"  # Menambahkan timestamp ke nama file output

        try:
            with open(output_path, "w", encoding="utf-8") as f:  # Membuka file untuk menulis
                f.write(f"Processing Date: {datetime.now()}\n")  # Menulis tanggal pemrosesan
                f.write(f"Chunk Size: {self.chunk_size} tokens\n")  # Menulis ukuran chunk
                f.write("=" * 50 + "\n\n")  # Menulis garis pemisah

                for i, (chunk, token_count) in enumerate(content_chunks, 1):  # Iterasi melalui setiap chunk
                    f.write(f"=== Chunk {i} ===\n")  # Menulis header untuk chunk
                    f.write(f"Tokens: {token_count}\n")  # Menulis jumlah token dalam chunk
                    f.write(chunk + "\n\n")  # Menulis konten chunk
                    f.write("-" * 30 + "\n\n")  # Menulis garis pemisah antar chunk

            logging.info(f"Successfully saved chunks to: {output_path}")  # Mencatat bahwa chunk berhasil disimpan
            logging.info(f"Total chunks: {len(content_chunks)}")  # Mencatat total chunk yang disimpan
        except Exception as e:
            logging.error(f"Error saving chunks: {str(e)}")  # Mencatat kesalahan saat menyimpan chunk
            raise  # Mengangkat kembali kesalahan

4. Eksekusi Program

In [None]:
if __name__ == "__main__":
    file_path = "./data/tko.pdf"  # Menentukan jalur ke file PDF yang akan diproses
    output_path = "./output_chunks/output_token_base.txt"  # Menentukan jalur untuk menyimpan hasil chunking

    processor = SentenceChunkProcessor(chunk_size=500)  # Membuat instance dari SentenceChunkProcessor dengan ukuran chunk 500

    if not os.path.exists(file_path):  # Memeriksa apakah file PDF ada
        print(f"Error: File '{file_path}' tidak ditemukan!")  # Menampilkan pesan kesalahan jika file tidak ditemukan
    else:
        try:
            print("🔍 Memulai proses membaca PDF...")  # Menampilkan pesan bahwa proses membaca PDF dimulai
            text = processor.read_pdf_and_clean(file_path)  # Membaca dan membersihkan teks dari PDF
            print(f"Teks PDF terbaca (50 karakter pertama): {text[:50]}...")  # Menampilkan 50 karakter pertama dari teks yang terbaca

            print("🔍 Memulai proses chunking...")  # Menampilkan pesan bahwa proses chunking dimulai
            content_chunks = processor.chunk_text_by_sentences(text)  # Membagi teks menjadi chunk berdasarkan kalimat
            print(f"Total chunk yang dibuat: {len(content_chunks)}")  # Menampilkan total chunk yang dibuat

            print("🔍 Menyimpan hasil chunking ke output file...")  # Menampilkan pesan bahwa hasil chunking akan disimpan
            processor.save_chunks(content_chunks, output_path)  # Menyimpan chunk ke file output
            print(f"✅ File output tersimpan di: {output_path}")  # Menampilkan pesan sukses

        except Exception as e:
            print(f"❌ Terjadi error: {str(e)}")  # Menampilkan pesan kesalahan jika terjadi error

5. Keseluruhan Program

In [3]:
import os  # Mengimpor modul os untuk operasi sistem file
import tiktoken  # Mengimpor tiktoken untuk pengkodean token
import fitz  # Mengimpor fitz (PyMuPDF) untuk membaca file PDF
from datetime import datetime  # Mengimpor datetime untuk menangani tanggal dan waktu
from typing import List, Tuple  # Mengimpor tipe data List dan Tuple untuk anotasi tipe
import re  # Mengimpor modul re untuk ekspresi reguler
import logging  # Mengimpor modul logging untuk mencatat informasi
import gc  # Mengimpor modul gc untuk pengelolaan memori
import nltk  # Mengimpor NLTK untuk pemrosesan bahasa alami
nltk.download('punkt')  # Mengunduh model tokenisasi kalimat dari NLTK
from nltk.tokenize import sent_tokenize  # Mengimpor fungsi untuk tokenisasi kalimat

# Set up logging
logging.basicConfig(
    filename='pdf_processing.log',  # Menentukan nama file log
    level=logging.INFO,  # Menentukan level logging
    format='%(asctime)s - %(levelname)s - %(message)s'  # Format pesan log
)

class SentenceChunkProcessor:
    def __init__(self, chunk_size=500):
        # Inisialisasi parameter untuk chunking
        self.chunk_size = chunk_size  # Ukuran maksimum setiap chunk
        self.encoder = tiktoken.get_encoding("cl100k_base")  # Menginisialisasi encoder token

    def read_pdf_and_clean(self, file_path: str) -> str:
        """Extract text from PDF file with better formatting."""
        logging.info(f"Started processing PDF: {file_path}")  # Mencatat bahwa pemrosesan PDF dimulai
        doc = fitz.open(file_path)  # Membuka file PDF
        text = " ".join([page.get_text("text") for page in doc])  # Mengambil teks dari setiap halaman
        text = re.sub(r'\s+', ' ', text).strip()  # Menghapus spasi berlebih dan menghapus spasi di awal/akhir

        # Menghindari pemisahan angka yang salah (4. 3.)
        text = re.sub(r'(\d+)\.\s(?=\d+)', r'\1|', text)  # Mengganti pemisahan angka dengan karakter '|'

        logging.info(f"Successfully read and cleaned PDF: {file_path}")  # Mencatat bahwa PDF berhasil dibaca dan dibersihkan
        return text  # Mengembalikan teks yang telah dibersihkan

    def restore_numbering(self, text: str) -> str:
        """Mengembalikan angka yang sebelumnya diganti (4|3 -> 4. 3.)"""
        return text.replace('|', '. ')  # Mengganti karakter '|' kembali menjadi '. '

    def chunk_text_by_sentences(self, text: str) -> List[Tuple[str, int]]:
        """Create chunks with improved sentence handling."""
        text = self.restore_numbering(text)  # Kembalikan format angka
        sentences = sent_tokenize(text)  # Tokenisasi teks menjadi kalimat
        chunks = []  # Inisialisasi daftar untuk menyimpan chunk
        current_tokens = []  # Inisialisasi daftar untuk menyimpan token saat ini
        current_text = ""  # Inisialisasi string untuk menyimpan teks saat ini

        for sentence in sentences:  # Iterasi melalui setiap kalimat
            sentence = sentence.strip()  # Menghapus spasi di awal/akhir kalimat
            sentence_tokens = self.encoder.encode(sentence)  # Mengkodekan kalimat menjadi token
            
            # Jika penambahan kalimat melebihi ukuran chunk
            if len(current_tokens) + len(sentence_tokens) > self.chunk_size:
                if current_tokens:  # Jika ada token saat ini
                    chunks.append((self.encoder.decode(current_tokens), len(current_tokens)))  # Tambahkan chunk ke daftar
                    current_tokens = []  # Reset token saat ini tanpa overlap
                    current_text = ""  # Reset teks saat ini
            
            current_tokens.extend(sentence_tokens)  # Tambahkan token kalimat ke token saat ini
            current_text += sentence  # Tambahkan kalimat ke teks saat ini
            
            gc.collect()  # Mengumpulkan sampah untuk mengelola memori
        
        if current_tokens:  # Jika ada token yang tersisa
            chunks.append((self.encoder.decode(current_tokens), len(current_tokens)))  # Tambahkan chunk terakhir ke daftar

        logging.info(f"Processed and chunked text into {len(chunks)} chunks")  # Mencatat jumlah chunk yang dihasilkan
        return chunks  # Mengembalikan daftar chunk

    def save_chunks(self, content_chunks: List[Tuple[str, int]], output_path: str) -> None:
        """Save chunks to a single file with proper text wrapping."""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")  # Mendapatkan timestamp untuk penamaan file
        output_path = f"{os.path.splitext(output_path)[0]}_{timestamp}.txt"  # Menambahkan timestamp ke nama file output

        try:
            with open(output_path, "w", encoding="utf-8") as f:  # Membuka file untuk menulis
                f.write(f"Processing Date: {datetime.now()}\n")  # Menulis tanggal pemrosesan
                f.write(f"Chunk Size: {self.chunk_size} tokens\n")  # Menulis ukuran chunk
                f.write("=" * 50 + "\n\n")  # Menulis garis pemisah

                for i, (chunk, token_count) in enumerate(content_chunks, 1):  # Iterasi melalui setiap chunk
                    f.write(f"=== Chunk {i} ===\n")  # Menulis header untuk chunk
                    f.write(f"Tokens: {token_count}\n")  # Menulis jumlah token dalam chunk
                    f.write(chunk + "\n\n")  # Menulis konten chunk
                    f.write("-" * 30 + "\n\n")  # Menulis garis pemisah antar chunk

            logging.info(f"Successfully saved chunks to: {output_path}")  # Mencatat bahwa chunk berhasil disimpan
            logging.info(f"Total chunks: {len(content_chunks)}")  # Mencatat total chunk yang disimpan
        except Exception as e:
            logging.error(f"Error saving chunks: {str(e)}")  # Mencatat kesalahan saat menyimpan chunk
            raise  # Mengangkat kembali kesalahan

# Main Program
if __name__ == "__main__":
    file_path = "./data/tko.pdf"  # Menentukan jalur ke file PDF yang akan diproses
    output_path = "./output_chunks/output_token_base.txt"  # Menentukan jalur untuk menyimpan hasil chunking

    processor = SentenceChunkProcessor(chunk_size=500)  # Membuat instance dari SentenceChunkProcessor dengan ukuran chunk 500

    if not os.path.exists(file_path):  # Memeriksa apakah file PDF ada
        print(f"Error: File '{file_path}' tidak ditemukan!")  # Menampilkan pesan kesalahan jika file tidak ditemukan
    else:
        try:
            print("🔍 Memulai proses membaca PDF...")  # Menampilkan pesan bahwa proses membaca PDF dimulai
            text = processor.read_pdf_and_clean(file_path)  # Membaca dan membersihkan teks dari PDF
            print(f"Teks PDF terbaca (50 karakter pertama): {text[:50]}...")  # Menampilkan 50 karakter pertama dari teks yang terbaca

            print("🔍 Memulai proses chunking...")  # Menampilkan pesan bahwa proses chunking dimulai
            content_chunks = processor.chunk_text_by_sentences(text)  # Membagi teks menjadi chunk berdasarkan kalimat
            print(f"Total chunk yang dibuat: {len(content_chunks)}")  # Menampilkan total chunk yang dibuat

            print("🔍 Menyimpan hasil chunking ke output file...")  # Menampilkan pesan bahwa hasil chunking akan disimpan
            processor.save_chunks(content_chunks, output_path)  # Menyimpan chunk ke file output
            print(f"✅ File output tersimpan di: {output_path}")  # Menampilkan pesan sukses

        except Exception as e:
            print(f"❌ Terjadi error: {str(e)}")  # Menampilkan pesan kesalahan jika terjadi error

[nltk_data] Downloading package punkt to C:\Users\Agus
[nltk_data]     Syuhada\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


🔍 Memulai proses membaca PDF...
Teks PDF terbaca (50 karakter pertama): TATA KERJA ORGANISASI PENGELOLAAN AKUN APLIKASI No...
🔍 Memulai proses chunking...
Total chunk yang dibuat: 6
🔍 Menyimpan hasil chunking ke output file...
✅ File output tersimpan di: ./output_chunks/output_token_base.txt


### CONTEXT ENRICHED

1. Import Library

In [None]:
import os  # Mengimpor modul os untuk operasi sistem file
import re  # Mengimpor modul re untuk ekspresi reguler
from PyPDF2 import PdfReader  # Mengimpor PdfReader dari PyPDF2 untuk membaca file PDF
from langchain.text_splitter import RecursiveCharacterTextSplitter  # Mengimpor RecursiveCharacterTextSplitter untuk membagi teks
from reportlab.lib.pagesizes import A4  # Mengimpor ukuran halaman A4 dari reportlab
from reportlab.pdfgen import canvas  # Mengimpor canvas dari reportlab untuk membuat PDF

2. Kelas Advanced Chunking
    
    Berisikan fungsi
    - init (Konfigurasi dan inisialisasi parameter)
    - baca_pdf
    - preprocessing_teks
    - chunk_dengan_konteks
    - proses_folder_pdf

In [None]:
class AdvancedChunking:
    def __init__(self, ukuran_chunk=1000, overlap=200):
        # Inisialisasi parameter untuk chunking
        self.ukuran_chunk = ukuran_chunk  # Ukuran maksimum setiap chunk
        self.overlap = overlap  # Jumlah karakter yang tumpang tindih antara chunk
        self.output_folder = 'output_chunks'  # Folder untuk menyimpan hasil chunking
        os.makedirs(self.output_folder, exist_ok=True)  # Membuat folder output jika belum ada

    def baca_pdf(self, path_file):
        """Membaca teks dari file PDF."""
        pdf_reader = PdfReader(path_file)  # Membuka file PDF
        teks = "".join([halaman.extract_text() or "" for halaman in pdf_reader.pages])  # Mengambil teks dari setiap halaman
        return teks  # Mengembalikan teks yang diekstrak

    def preprocessing_teks(self, teks):
        """Membersihkan teks dari spasi berlebih dan karakter tidak perlu."""
        return re.sub(r'\s+', ' ', teks).strip()  # Menghapus spasi berlebih dan mengembalikan teks yang telah dibersihkan

    def chunk_dengan_konteks(self, teks):
        """Membagi teks menjadi chunk dengan mempertahankan sedikit konteks dari chunk sebelumnya dan berikutnya."""
        teks_bersih = self.preprocessing_teks(teks)  # Membersihkan teks
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.ukuran_chunk,  # Ukuran chunk
            chunk_overlap=self.overlap,  # Jumlah overlap
            length_function=len,  # Fungsi untuk menghitung panjang
            separators=[".", "\n\n", "\n", " ", ""]  # Pemisah untuk chunking
        )

        chunks = text_splitter.create_documents([teks_bersih])  # Membagi teks menjadi chunk
        chunks_dengan_konteks = []  # Inisialisasi daftar untuk menyimpan chunk dengan konteks

        for i, chunk in enumerate(chunks):  # Iterasi melalui setiap chunk
            konteks_sebelum = chunks[i - 1].page_content if i > 0 else ""  # Ambil konteks dari chunk sebelumnya
            konteks_sesudah = chunks[i + 1].page_content if i < len(chunks) - 1 else ""  # Ambil konteks dari chunk berikutnya

            # Gabungkan konteks dengan chunk saat ini
            chunk_konteks = f"{konteks_sebelum} {chunk.page_content} {konteks_sesudah}".strip()
            chunk_konteks = chunk_konteks.rstrip('.') + '.'  # Pastikan chunk diakhiri dengan titik

            # Metadata untuk chunk
            metadata = {
                'chunk_id': i,
                'total_chunks': len(chunks),
                'panjang_chunk': len(chunk_konteks),
                'konteks_sebelum': konteks_sebelum[:50],  # Ambil 50 karakter pertama dari konteks sebelumnya
                'konteks_sesudah': konteks_sesudah[:50]   # Ambil 50 karakter pertama dari konteks berikutnya
            }

            chunks_dengan_konteks.append({'konten': chunk_konteks, 'metadata': metadata})  # Tambahkan chunk dan metadata ke daftar

        return chunks_dengan_konteks  # Mengembalikan daftar chunk dengan konteks

    def proses_folder_pdf(self, folder_path):
        """Memproses semua file PDF dalam folder dan membaginya menjadi chunk."""
        hasil = {}  # Inisialisasi dictionary untuk menyimpan hasil
        total_chunk = 0  # Inisialisasi total chunk

        for filename in os.listdir(folder_path):  # Iterasi melalui setiap file dalam folder
            if filename.endswith('.pdf'):  # Memeriksa apakah file adalah PDF
                file_path = os.path.join(folder_path, filename)  # Mendapatkan jalur lengkap file
                print(f"\n{'='*50}")  # Menampilkan garis pemisah
                print(f"Memproses file: {filename}")  # Menampilkan nama file yang sedang diproses
                print(f"{'='*50}")

                teks_pdf = self.baca_pdf(file_path)  # Membaca teks dari file PDF
                chunks = self.chunk_dengan_konteks(teks_pdf)  # Membagi teks menjadi chunk total_chunk += len(chunks)  # Menambahkan jumlah chunk yang dihasilkan
                hasil[filename] = {'chunks': chunks}  # Menyimpan hasil chunking dalam dictionary

                # Menyimpan hasil chunking ke file teks
                chunk_file = os.path.join(self.output_folder, f"{filename}_chunks.txt")  # Menentukan jalur untuk file output
                with open(chunk_file, "w", encoding="utf-8") as f:  # Membuka file untuk menulis
                    for i, chunk in enumerate(chunks, 1):  # Iterasi melalui setiap chunk
                        f.write(f"Chunk {i}:\n{chunk['konten']}\n\n")  # Menulis konten chunk ke file

                print(f"Hasil chunking disimpan di: {chunk_file}")  # Menampilkan pesan bahwa hasil chunking telah disimpan

        return hasil  # Mengembalikan hasil chunking

3. Eksekusi Program

In [None]:
if __name__ == "__main__":
    chunker = AdvancedChunking(ukuran_chunk=1000, overlap=200)  # Membuat instance dari AdvancedChunking dengan ukuran chunk dan overlap yang ditentukan
    FOLDER_PATH = "./data"  # Menentukan jalur folder yang berisi file PDF

    try:
        hasil_chunking = chunker.proses_folder_pdf(FOLDER_PATH)  # Memproses semua file PDF dalam folder
    except Exception as e:
        print(f"Terjadi error: {str(e)}")  # Menampilkan pesan kesalahan jika terjadi error

4. Keseluruhan Kode Program

In [6]:
import os  # Mengimpor modul os untuk operasi sistem file
import re  # Mengimpor modul re untuk ekspresi reguler
from PyPDF2 import PdfReader  # Mengimpor PdfReader dari PyPDF2 untuk membaca file PDF
from langchain.text_splitter import RecursiveCharacterTextSplitter  # Mengimpor RecursiveCharacterTextSplitter untuk membagi teks
from reportlab.lib.pagesizes import A4  # Mengimpor ukuran halaman A4 dari reportlab
from reportlab.pdfgen import canvas  # Mengimpor canvas dari reportlab untuk membuat PDF

class AdvancedChunking:
    def __init__(self, ukuran_chunk=1000, overlap=200):
        # Inisialisasi parameter untuk chunking
        self.ukuran_chunk = ukuran_chunk  # Ukuran maksimum setiap chunk
        self.overlap = overlap  # Jumlah karakter yang tumpang tindih antara chunk
        self.output_folder = 'output_chunks'  # Folder untuk menyimpan hasil chunking
        os.makedirs(self.output_folder, exist_ok=True)  # Membuat folder output jika belum ada

    def baca_pdf(self, path_file):
        """Membaca teks dari file PDF."""
        pdf_reader = PdfReader(path_file)  # Membuka file PDF
        teks = "".join([halaman.extract_text() or "" for halaman in pdf_reader.pages])  # Mengambil teks dari setiap halaman
        return teks  # Mengembalikan teks yang diekstrak

    def preprocessing_teks(self, teks):
        """Membersihkan teks dari spasi berlebih dan karakter tidak perlu."""
        return re.sub(r'\s+', ' ', teks).strip()  # Menghapus spasi berlebih dan mengembalikan teks yang telah dibersihkan

    def chunk_dengan_konteks(self, teks):
        """Membagi teks menjadi chunk dengan mempertahankan sedikit konteks dari chunk sebelumnya dan berikutnya."""
        teks_bersih = self.preprocessing_teks(teks)  # Membersihkan teks
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.ukuran_chunk,  # Ukuran chunk
            chunk_overlap=self.overlap,  # Jumlah overlap
            length_function=len,  # Fungsi untuk menghitung panjang
            separators=[".", "\n\n", "\n", " ", ""]  # Pemisah untuk chunking
        )

        chunks = text_splitter.create_documents([teks_bersih])  # Membagi teks menjadi chunk
        chunks_dengan_konteks = []  # Inisialisasi daftar untuk menyimpan chunk dengan konteks

        for i, chunk in enumerate(chunks):  # Iterasi melalui setiap chunk
            konteks_sebelum = chunks[i - 1].page_content if i > 0 else ""  # Ambil konteks dari chunk sebelumnya
            konteks_sesudah = chunks[i + 1].page_content if i < len(chunks) - 1 else ""  # Ambil konteks dari chunk berikutnya

            # Gabungkan konteks dengan chunk saat ini
            chunk_konteks = f"{konteks_sebelum} {chunk.page_content} {konteks_sesudah}".strip()
            chunk_konteks = chunk_konteks.rstrip('.') + '.'  # Pastikan chunk diakhiri dengan titik

            # Metadata untuk chunk
            metadata = {
                'chunk_id': i,
                'total_chunks': len(chunks),
                'panjang_chunk': len(chunk_konteks),
                'konteks_sebelum': konteks_sebelum[:50],  # Ambil 50 karakter pertama dari konteks sebelumnya
                'konteks_sesudah': konteks_sesudah[:50]   # Ambil 50 karakter pertama dari konteks berikutnya
            }

            chunks_dengan_konteks.append({'konten': chunk_konteks, 'metadata': metadata})  # Tambahkan chunk dan metadata ke daftar

        return chunks_dengan_konteks  # Mengembalikan daftar chunk dengan konteks

    def proses_folder_pdf(self, folder_path):
        """Memproses semua file PDF dalam folder dan membaginya menjadi chunk."""
        hasil = {}  # Inisialisasi dictionary untuk menyimpan hasil
        total_chunk = 0  # Inisialisasi total chunk

        for filename in os.listdir(folder_path):  # Iterasi melalui setiap file dalam folder
            if filename.endswith('.pdf'):  # Memeriksa apakah file adalah PDF
                file_path = os.path.join(folder_path, filename)  # Mendapatkan jalur lengkap file
                print(f"\n{'='*50}")  # Menampilkan garis pemisah
                print(f"Memproses file: {filename}")  # Menampilkan nama file yang sedang diproses
                print(f"{'='*50}")

                teks_pdf = self.baca_pdf(file_path)  # Membaca teks dari file PDF
                chunks = self.chunk_dengan_konteks(teks_pdf)  # Membagi teks menjadi chunk total_chunk += len(chunks)  # Menambahkan jumlah chunk yang dihasilkan
                hasil[filename] = {'chunks': chunks}  # Menyimpan hasil chunking dalam dictionary

                # Menyimpan hasil chunking ke file teks
                chunk_file = os.path.join(self.output_folder, f"{filename}_chunks.txt")  # Menentukan jalur untuk file output
                with open(chunk_file, "w", encoding="utf-8") as f:  # Membuka file untuk menulis
                    for i, chunk in enumerate(chunks, 1):  # Iterasi melalui setiap chunk
                        f.write(f"Chunk {i}:\n{chunk['konten']}\n\n")  # Menulis konten chunk ke file

                print(f"Hasil chunking disimpan di: {chunk_file}")  # Menampilkan pesan bahwa hasil chunking telah disimpan

        return hasil  # Mengembalikan hasil chunking

if __name__ == "__main__":
    chunker = AdvancedChunking(ukuran_chunk=1000, overlap=200)  # Membuat instance dari AdvancedChunking dengan ukuran chunk dan overlap yang ditentukan
    FOLDER_PATH = "./data"  # Menentukan jalur folder yang berisi file PDF

    try:
        hasil_chunking = chunker.proses_folder_pdf(FOLDER_PATH)  # Memproses semua file PDF dalam folder
    except Exception as e:
        print(f"Terjadi error: {str(e)}")  # Menampilkan pesan kesalahan jika terjadi error


Memproses file: dokumen_pdf.pdf
Hasil chunking disimpan di: output_chunks\dokumen_pdf.pdf_chunks.txt

Memproses file: tko.pdf
Hasil chunking disimpan di: output_chunks\tko.pdf_chunks.txt


### SEMANTIC

1. Import Library

In [None]:
import fitz  # Mengimpor PyMuPDF untuk membaca file PDF
import re  # Mengimpor modul re untuk ekspresi reguler

2. Fungsi read_pdf

In [None]:
def read_pdf(file_path):
    """Membaca teks dari file PDF."""
    doc = fitz.open(file_path)  # Membuka file PDF
    # Mengambil teks dari setiap halaman dan menggabungkannya dengan newline
    text = "\n".join([page.get_text("text") for page in doc])  
    return text  # Mengembalikan teks yang diekstrak dari PDF

3. Fungsi celan_text

In [None]:
def clean_text(text):
    """Membersihkan teks dari karakter khusus, spasi berlebih, dan format yang tidak diperlukan."""
    text = text.replace("\n", " ").strip()  # Mengganti newline dengan spasi dan menghapus spasi di awal/akhir
    text = " ".join(text.split())  # Menghapus spasi berlebih di antara kata
    return text  # Mengembalikan teks yang telah dibersihkan

4. Fungsi chunk_text

In [None]:
def chunk_text(text, chunk_size=1000):
    """Membagi teks menjadi chunk lebih kecil berdasarkan ukuran karakter."""
    # Membagi teks menjadi daftar chunk dengan ukuran maksimum chunk_size
    return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]

5. Fungsi save_to_txt

In [None]:
def save_to_txt(filename, chunks):
    """Menyimpan hasil ke file .txt."""
    with open(filename, "w", encoding="utf-8") as f:  # Membuka file untuk menulis
        for i, chunk in enumerate(chunks):  # Iterasi melalui setiap chunk
            f.write(f"Chunk {i+1}:\n{chunk}\n\n")  # Menulis chunk ke file dengan header

6. Eksekusi Program

In [None]:
if __name__ == "__main__":
    # Path ke file PDF
    pdf_path = "./data/tko.pdf"  # Menentukan jalur ke file PDF
    output_txt = "./output_chunks/output_semantic.txt"  # Menentukan jalur untuk menyimpan file output

    # Membaca, membersihkan, dan membagi teks
    raw_text = read_pdf(pdf_path)
    cleaned_text = clean_text(raw_text)
    chunks = chunk_text(cleaned_text)

    # Menyimpan output ke file .txt
    save_to_txt(output_txt, chunks)

    print(f"Hasil telah disimpan dalam {output_txt}")

7. Keseluruhan Kode Program

In [None]:
import fitz  # Mengimpor PyMuPDF untuk membaca file PDF
import re  # Mengimpor modul re untuk ekspresi reguler

def read_pdf(file_path):
    """Membaca teks dari file PDF."""
    doc = fitz.open(file_path)  # Membuka file PDF
    # Mengambil teks dari setiap halaman dan menggabungkannya dengan newline
    text = "\n".join([page.get_text("text") for page in doc])  
    return text  # Mengembalikan teks yang diekstrak dari PDF

def clean_text(text):
    """Membersihkan teks dari karakter khusus, spasi berlebih, dan format yang tidak diperlukan."""
    text = text.replace("\n", " ").strip()  # Mengganti newline dengan spasi dan menghapus spasi di awal/akhir
    text = " ".join(text.split())  # Menghapus spasi berlebih di antara kata
    return text  # Mengembalikan teks yang telah dibersihkan

def chunk_text(text, chunk_size=1000):
    """Membagi teks menjadi chunk lebih kecil berdasarkan ukuran karakter."""
    # Membagi teks menjadi daftar chunk dengan ukuran maksimum chunk_size
    return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]

def save_to_txt(filename, chunks):
    """Menyimpan hasil ke file .txt."""
    with open(filename, "w", encoding="utf-8") as f:  # Membuka file untuk menulis
        for i, chunk in enumerate(chunks):  # Iterasi melalui setiap chunk
            f.write(f"Chunk {i+1}:\n{chunk}\n\n")  # Menulis chunk ke file dengan header

if __name__ == "__main__":
    # Path ke file PDF
    pdf_path = "./data/tko.pdf"  # Menentukan jalur ke file PDF
    output_txt = "./output_chunks/output_semantic.txt"  # Menentukan jalur untuk menyimpan file output

    # Membaca, membersihkan, dan membagi teks
    raw_text = read_pdf(pdf_path)
    cleaned_text = clean_text(raw_text)
    chunks = chunk_text(cleaned_text)

    # Menyimpan output ke file .txt
    save_to_txt(output_txt, chunks)

    print(f"Hasil telah disimpan dalam {output_txt}")

Hasil telah disimpan dalam ./output_chunks/output_semantic.txt


### ADVANCE SEMANTIC

1. Import Library

In [None]:
import os  # Mengimpor modul os untuk operasi sistem file
import spacy  # Mengimpor spaCy untuk pemrosesan bahasa alami
import docx  # Mengimpor modul docx untuk membaca file DOCX
import fitz  # Mengimpor fitz (PyMuPDF) untuk membaca file PDF
import numpy as np  # Mengimpor numpy untuk operasi numerik
import gensim  # Mengimpor gensim untuk model topik
from gensim import corpora  # Mengimpor corpora dari gensim untuk membuat kamus
from langchain_huggingface import HuggingFaceEmbeddings  # Mengimpor model embedding dari Hugging Face
from langchain_ollama.llms import OllamaLLM  # Mengimpor model LLM dari Ollama

2. Kondigurasi Model

In [None]:
nlp = spacy.load("en_core_web_sm")  # Memuat model bahasa Inggris dari spaCy
OLLAMA_MODEL = "llama3.2"  # Menentukan model LLM yang akan digunakan
llm = OllamaLLM(model=OLLAMA_MODEL)  # Menginisialisasi model LLM
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")  # Memuat model embedding

3. Fungsi read_pdf

In [None]:
def read_pdf(file_path):
    print(f"Reading PDF file: {file_path}")  # Menampilkan pesan bahwa file PDF sedang dibaca
    text = ""  # Inisialisasi string kosong untuk menyimpan teks
    with fitz.open(file_path) as doc:  # Membuka file PDF
        for page in doc:  # Iterasi melalui setiap halaman
            text += page.get_text("text") + "\n"  # Mengambil teks dari halaman dan menambahkannya ke string
    return text  # Mengembalikan teks yang diekstrak

4. Fungsi read_docx

In [None]:
def read_docx(file_path):
    print(f"Reading DOCX file: {file_path}")  # Menampilkan pesan bahwa file DOCX sedang dibaca
    doc = docx.Document(file_path)  # Membuka file DOCX
    return "\n".join([para.text for para in doc.paragraphs])  # Menggabungkan teks dari setiap paragraf

5. Fungsi clean_text

In [None]:
def clean_text(text):
    print("Preprocessing text: Cleaning and normalizing...")  # Menampilkan pesan bahwa teks sedang diproses
    text = text.replace("\n", " ").strip()  # Mengganti newline dengan spasi dan menghapus spasi di awal/akhir
    text = " ".join(text.split())  # Menghapus spasi berlebih di antara kata
    return text  # Mengembalikan teks yang telah dibersihkan

6. Fungsi semantic_chunking

In [None]:
def semantic_chunking(text, chunk_size=1000):
    print("Performing semantic chunking...")  # Menampilkan pesan bahwa chunking sedang dilakukan
    doc = nlp(text)  # Memproses teks dengan spaCy
    sentences = [sent.text.strip() for sent in doc.sents if sent.text.strip()]  # Mengambil kalimat yang tidak kosong
    
    if not sentences:  # Jika tidak ada kalimat yang valid
        return []  # Mengembalikan daftar kosong
    
    chunks = []  # Inisialisasi daftar untuk menyimpan chunk
    chunk = ""  # Inisialisasi string untuk menyimpan teks chunk saat ini
    for sentence in sentences:  # Iterasi melalui setiap kalimat
        if len(chunk) + len(sentence) < chunk_size:  # Jika penambahan kalimat tidak melebihi ukuran chunk
            chunk += " " + sentence  # Tambahkan kalimat ke chunk
        else:
            chunks.append(chunk.strip())  # Tambahkan chunk yang telah dibentuk ke daftar
            chunk = sentence  # Mulai chunk baru dengan kalimat saat ini
    if chunk:  # Jika ada chunk yang tersisa
        chunks.append(chunk.strip())  # Tambahkan chunk terakhir ke daftar
    
    print(f"Generated {len(chunks)} chunks.")  # Menampilkan jumlah chunk yang dihasilkan
    return chunks  # Mengembalikan daftar chunk

7. Fungsi extract_topics

In [None]:
def extract_topics(text, num_topics=5):
    print("Extracting topics using LDA...")  # Menampilkan pesan bahwa topik sedang diekstrak
    words = [token.lemma_ for token in nlp(text) if token.is_alpha and not token.is_stop]  # Tokenisasi dan lemmatization
    dictionary = corpora.Dictionary([words])  # Membuat kamus dari kata-kata
    corpus = [dictionary.doc2bow(words)]  # Membuat korpus dari kata-kata
    lda_model = gensim .models.LdaMulticore(corpus, num_topics=num_topics, id2word=dictionary, passes=10, workers=4)  # Melatih model LDA dengan korpus
    topics = lda_model.show_topics(formatted=False)  # Mengambil topik yang dihasilkan dari model
    
    return topics[0] if topics else None  # Mengembalikan topik pertama jika ada, jika tidak mengembalikan None

8. Fungsi process_document

In [None]:
def process_document(file_path, output_folder):
    print(f"Processing document: {file_path}")  # Menampilkan pesan bahwa dokumen sedang diproses
    ext = file_path.split(".")[-1].lower()  # Mengambil ekstensi file
    
    if ext == "txt":  # Jika ekstensi adalah TXT
        text = read_txt(file_path)  # Membaca file TXT
    elif ext == "pdf":  # Jika ekstensi adalah PDF
        text = read_pdf(file_path)  # Membaca file PDF
    elif ext == "docx":  # Jika ekstensi adalah DOCX
        text = read_docx(file_path)  # Membaca file DOCX
    else:
        print(f"Unsupported file format: {ext}")  # Menampilkan pesan jika format file tidak didukung
        return  # Menghentikan eksekusi fungsi

9. Fungsi main

In [None]:
def main():
    input_folder = "./data"  # Menentukan folder input
    output_folder = "./output_chunks/"  # Menentukan folder output
    os.makedirs(output_folder, exist_ok=True)  # Membuat folder output jika belum ada
    
    for file_name in os.listdir(input_folder):  # Iterasi melalui setiap file dalam folder input
        file_path = os.path.join(input_folder, file_name)  # Menentukan jalur file
        if os.path.isfile(file_path):  # Jika jalur adalah file
            process_document(file_path, output_folder)  # Memproses dokumen

10. Eksekusi Program

In [None]:
if __name__ == "__main__":
    main()  # Menjalankan fungsi main jika file dijalankan sebagai skrip utama

11. Keseluruhan Kode Program

In [7]:
import os  # Mengimpor modul os untuk operasi sistem file
import spacy  # Mengimpor spaCy untuk pemrosesan bahasa alami
import docx  # Mengimpor modul docx untuk membaca file DOCX
import fitz  # Mengimpor fitz (PyMuPDF) untuk membaca file PDF
import numpy as np  # Mengimpor numpy untuk operasi numerik
import gensim  # Mengimpor gensim untuk model topik
from gensim import corpora  # Mengimpor corpora dari gensim untuk membuat kamus
from langchain_huggingface import HuggingFaceEmbeddings  # Mengimpor model embedding dari Hugging Face
from langchain_ollama.llms import OllamaLLM  # Mengimpor model LLM dari Ollama

# Memuat model NLP
nlp = spacy.load("en_core_web_sm")  # Memuat model bahasa Inggris dari spaCy
OLLAMA_MODEL = "llama3.2"  # Menentukan model LLM yang akan digunakan
llm = OllamaLLM(model=OLLAMA_MODEL)  # Menginisialisasi model LLM
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")  # Memuat model embedding

def read_txt(file_path):
    print(f"Reading TXT file: {file_path}")  # Menampilkan pesan bahwa file TXT sedang dibaca
    with open(file_path, "r", encoding="utf-8") as f:  # Membuka file dalam mode baca
        return f.read()  # Mengembalikan seluruh isi file

def read_pdf(file_path):
    print(f"Reading PDF file: {file_path}")  # Menampilkan pesan bahwa file PDF sedang dibaca
    text = ""  # Inisialisasi string kosong untuk menyimpan teks
    with fitz.open(file_path) as doc:  # Membuka file PDF
        for page in doc:  # Iterasi melalui setiap halaman
            text += page.get_text("text") + "\n"  # Mengambil teks dari halaman dan menambahkannya ke string
    return text  # Mengembalikan teks yang diekstrak

def read_docx(file_path):
    print(f"Reading DOCX file: {file_path}")  # Menampilkan pesan bahwa file DOCX sedang dibaca
    doc = docx.Document(file_path)  # Membuka file DOCX
    return "\n".join([para.text for para in doc.paragraphs])  # Menggabungkan teks dari setiap paragraf

def clean_text(text):
    print("Preprocessing text: Cleaning and normalizing...")  # Menampilkan pesan bahwa teks sedang diproses
    text = text.replace("\n", " ").strip()  # Mengganti newline dengan spasi dan menghapus spasi di awal/akhir
    text = " ".join(text.split())  # Menghapus spasi berlebih di antara kata
    return text  # Mengembalikan teks yang telah dibersihkan

def semantic_chunking(text, chunk_size=1000):
    print("Performing semantic chunking...")  # Menampilkan pesan bahwa chunking sedang dilakukan
    doc = nlp(text)  # Memproses teks dengan spaCy
    sentences = [sent.text.strip() for sent in doc.sents if sent.text.strip()]  # Mengambil kalimat yang tidak kosong
    
    if not sentences:  # Jika tidak ada kalimat yang valid
        return []  # Mengembalikan daftar kosong
    
    chunks = []  # Inisialisasi daftar untuk menyimpan chunk
    chunk = ""  # Inisialisasi string untuk menyimpan teks chunk saat ini
    for sentence in sentences:  # Iterasi melalui setiap kalimat
        if len(chunk) + len(sentence) < chunk_size:  # Jika penambahan kalimat tidak melebihi ukuran chunk
            chunk += " " + sentence  # Tambahkan kalimat ke chunk
        else:
            chunks.append(chunk.strip())  # Tambahkan chunk yang telah dibentuk ke daftar
            chunk = sentence  # Mulai chunk baru dengan kalimat saat ini
    if chunk:  # Jika ada chunk yang tersisa
        chunks.append(chunk.strip())  # Tambahkan chunk terakhir ke daftar
    
    print(f"Generated {len(chunks)} chunks.")  # Menampilkan jumlah chunk yang dihasilkan
    return chunks  # Mengembalikan daftar chunk

def extract_topics(text, num_topics=5):
    print("Extracting topics using LDA...")  # Menampilkan pesan bahwa topik sedang diekstrak
    words = [token.lemma_ for token in nlp(text) if token.is_alpha and not token.is_stop]  # Tokenisasi dan lemmatization
    dictionary = corpora.Dictionary([words])  # Membuat kamus dari kata-kata
    corpus = [dictionary.doc2bow(words)]  # Membuat korpus dari kata-kata
    lda_model = gensim .models.LdaMulticore(corpus, num_topics=num_topics, id2word=dictionary, passes=10, workers=4)  # Melatih model LDA dengan korpus
    topics = lda_model.show_topics(formatted=False)  # Mengambil topik yang dihasilkan dari model
    
    return topics[0] if topics else None  # Mengembalikan topik pertama jika ada, jika tidak mengembalikan None

def process_document(file_path, output_folder):
    print(f"Processing document: {file_path}")  # Menampilkan pesan bahwa dokumen sedang diproses
    ext = file_path.split(".")[-1].lower()  # Mengambil ekstensi file
    
    if ext == "txt":  # Jika ekstensi adalah TXT
        text = read_txt(file_path)  # Membaca file TXT
    elif ext == "pdf":  # Jika ekstensi adalah PDF
        text = read_pdf(file_path)  # Membaca file PDF
    elif ext == "docx":  # Jika ekstensi adalah DOCX
        text = read_docx(file_path)  # Membaca file DOCX
    else:
        print(f"Unsupported file format: {ext}")  # Menampilkan pesan jika format file tidak didukung
        return  # Menghentikan eksekusi fungsi
    
    cleaned_text = clean_text(text)  # Membersihkan teks yang dibaca
    chunks = semantic_chunking(cleaned_text)  # Melakukan chunking pada teks yang telah dibersihkan
    
    output_file = os.path.join(output_folder, os.path.basename(file_path).split(".")[0] + "_chunks.txt")  # Menentukan jalur untuk file output
    with open(output_file, "w", encoding="utf-8") as f:  # Membuka file output untuk menulis
        for i, chunk in enumerate(chunks):  # Iterasi melalui setiap chunk
            print(f"Processing chunk {i+1}...")  # Menampilkan pesan bahwa chunk sedang diproses
            best_topic = extract_topics(chunk)  # Mengekstrak topik dari chunk
            if best_topic:  # Jika ada topik yang ditemukan
                topic_words = ", ".join([word for word, _ in best_topic[1]])  # Menggabungkan kata-kata topik
                topic_str = f"Best Topic: {topic_words}"  # Menyusun string topik terbaik
            else:
                topic_str = "Best Topic: No topics found"  # Menyusun string jika tidak ada topik ditemukan
            f.write(f"--- Chunk {i+1} ---\n{chunk}\n{topic_str}\n\n")  # Menulis chunk dan topik ke file output
    
    print(f"Finished processing: {file_path} -> {output_file}")  # Menampilkan pesan bahwa pemrosesan selesai

def main():
    input_folder = "./data"  # Menentukan folder input
    output_folder = "./output_chunks/"  # Menentukan folder output
    os.makedirs(output_folder, exist_ok=True)  # Membuat folder output jika belum ada
    
    for file_name in os.listdir(input_folder):  # Iterasi melalui setiap file dalam folder input
        file_path = os.path.join(input_folder, file_name)  # Menentukan jalur file
        if os.path.isfile(file_path):  # Jika jalur adalah file
            process_document(file_path, output_folder)  # Memproses dokumen

if __name__ == "__main__":
    main()  # Menjalankan fungsi main jika file dijalankan sebagai skrip utama

Processing document: ./data\.gitkeep
Unsupported file format: gitkeep
Processing document: ./data\dokumen_pdf.pdf
Reading PDF file: ./data\dokumen_pdf.pdf
Preprocessing text: Cleaning and normalizing...
Performing semantic chunking...
Generated 5 chunks.
Processing chunk 1...
Extracting topics using LDA...
Processing chunk 2...
Extracting topics using LDA...
Processing chunk 3...
Extracting topics using LDA...
Processing chunk 4...
Extracting topics using LDA...
Processing chunk 5...
Extracting topics using LDA...
Finished processing: ./data\dokumen_pdf.pdf -> ./output_chunks/dokumen_pdf_chunks.txt
Processing document: ./data\tko.pdf
Reading PDF file: ./data\tko.pdf
Preprocessing text: Cleaning and normalizing...
Performing semantic chunking...
Generated 10 chunks.
Processing chunk 1...
Extracting topics using LDA...
Processing chunk 2...
Extracting topics using LDA...
Processing chunk 3...
Extracting topics using LDA...
Processing chunk 4...
Extracting topics using LDA...
Processing c

### AGENTIC

1. Import Library

In [None]:
import os  # Mengimpor modul os untuk operasi sistem file
import json  # Mengimpor modul json untuk pengelolaan data JSON
import re  # Mengimpor modul re untuk ekspresi reguler
from IPython.display import Markdown  # Mengimpor Markdown untuk menampilkan teks dalam format Markdown di Jupyter Notebook
from phi.agent import Agent  # Mengimpor Agent untuk penggunaan model LLM
from phi.model.ollama import Ollama  # Mengimpor Ollama untuk penggunaan model LLM
from langchain_huggingface import HuggingFaceEmbeddings  # Mengimpor model embedding dari Hugging Face
from langchain.vectorstores import FAISS  # Mengimpor FAISS untuk pengelolaan vektor
from langchain_core.documents import Document  # Mengimpor Document untuk pengelolaan dokumen
from utils.document_processor import DocumentProcessor  # Mengimpor DocumentProcessor untuk pengelolaan dokumen

2. Konfigurasi Model dan Parameter

In [None]:
# Inisialisasi path dan model
DATA_PATH = "./data"  # Menentukan folder data
INDEX_PATH = "faiss_index"  # Menentukan jalur untuk indeks FAISS
CHUNKED_DATA_PATH = "./output_chunks"  # Menentukan folder untuk hasil chunking
METADATA_PATH = "./metadata"  # Menentukan folder untuk metadata
OLLAMA_MODEL = "llama3.2"  # Menentukan model LLM yang akan digunakan

# Pastikan folder tersedia
os.makedirs(CHUNKED_DATA_PATH, exist_ok=True)  # Membuat folder untuk hasil chunking jika belum ada
os.makedirs(METADATA_PATH, exist_ok=True)  # Membuat folder untuk metadata jika belum ada

# Inisialisasi model Ollama
llm = Ollama(id=OLLAMA_MODEL)  # Menginisialisasi model LLM

# Inisialisasi embedding model
embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")  # Menginisialisasi model embedding

# Inisialisasi document processor
docs = DocumentProcessor()  # Menginisialisasi DocumentProcessor

# Inisialisasi Agent dengan model Ollama
agent = Agent(model=llm, show_tool_calls=True, markdown=True)  # Menginisialisasi Agent dengan model LLM

# Parameter chunking
CHUNK_SIZE = 1200  # Menentukan ukuran chunk
MIN_CHUNK_SIZE = 500  # Menentukan ukuran chunk minimum
MAX_CHUNKS = 30  # Menentukan jumlah chunk maksimum

extracted_docs = []  # Inisialisasi daftar untuk menyimpan dokumen yang telah diproses

3. Fungsi clean_text

In [None]:
def clean_text(text):
    """ Membersihkan teks dari karakter kosong, whitespace berlebih, dan simbol aneh """
    text = re.sub(r'\s+', ' ', text).strip()  # Mengganti spasi berlebih dengan satu spasi dan menghapus spasi di awal/akhir
    text = re.sub(r'[^\x00-\x7F]+', ' ', text)  # Mengganti karakter yang tidak dapat diwakili dengan spasi
    return text  # Mengembalikan teks yang telah dibersihkan

4. Fungsi clean_agent_output

In [None]:
def clean_agent_output(text):
    """ Membersihkan output dari agent agar tidak mengandung pemisah yang tidak perlu """
    text = re.sub(r'\n?###.*?\n', '\n', text)  # Menghapus judul markdown seperti ### Section
    text = re.sub(r'\n?\*\*\*.*?\n', '\n', text)  # Menghapus pemisah ***
    text = re.sub(r'\n?-{3,}\n?', '\n', text)  # Menghapus garis pemisah ---
    text = re.sub(r'(\s*-{2,}\s*)', ' ', text)  # Menghapus pemisah --
    text = re.sub(r'(\s*\*{2,}\s*)', ' ', text)  # Menghapus ** pemisah tebal
    text = re.sub(r'(\s*\*\s*)', ' ', text)  # Menghapus * pemisah tunggal
    text = re.sub(r'(\s*-\s*)', ' ', text)  # Menghapus pemisah -
    text = re.sub(r'\n{2,}', '\n\n', text).strip()  # Mengganti newline berlebih dengan dua newline
    return text  # Mengembalikan teks yang telah dibersihkan

5. Fungsi membaca, memproses, dan membagi dokumen teks

In [None]:
for filename in os.listdir(DATA_PATH):  # Iterasi melalui setiap file dalam folder data
    valid_extensions = ('.pdf', '.docx', '.txt')  # Menentukan ekstensi file yang valid
    if not filename.lower().endswith(valid_extensions):  # Jika file tidak memiliki ek stensi yang valid, lanjutkan ke file berikutnya
        continue  # Melewati file yang tidak valid

    filepath = os.path.join(DATA_PATH, filename)  # Menentukan jalur lengkap untuk file

    try:
        with open(filepath, "rb") as f:  # Membuka file dalam mode baca biner
            document = f.read()  # Membaca isi file
            result = docs.process_document(document, filename)  # Memproses dokumen menggunakan DocumentProcessor

        if not result or len(result) < 4:  # Memeriksa apakah hasil pemrosesan valid
            print(f"[WARNING] Gagal memproses {filename}, melewati file ini.")  # Menampilkan peringatan jika pemrosesan gagal
            continue  # Melewati file yang gagal diproses

        plain_text = clean_text(result[3])  # Mengambil dan membersihkan teks dari hasil pemrosesan
        print(f"[INFO] {filename} - Panjang teks sebelum pemrosesan: {len(plain_text)}")  # Menampilkan panjang teks yang dibaca

        # Gunakan agent untuk chunking secara bertahap
        structured_text = ""  # Inisialisasi string untuk menyimpan teks terstruktur
        start_idx = 0  # Inisialisasi indeks awal untuk chunking
        chunk_count = 0  # Inisialisasi penghitung chunk

        while start_idx < len(plain_text) and chunk_count < MAX_CHUNKS:  # Selama ada teks yang tersisa dan jumlah chunk belum mencapai maksimum
            chunk_text = plain_text[start_idx:start_idx + CHUNK_SIZE]  # Mengambil chunk teks
            response = agent.run(  # Menjalankan agent untuk memproses chunk
                f"Split the following text into meaningful segments ensuring logical separation:\n{chunk_text}",
                max_tokens=8000  # Menentukan batas token untuk respons
            )
            
            if isinstance(response, str):  # Jika respons adalah string
                structured_text += clean_agent_output(response) + "\n\n"  # Membersihkan dan menambahkan respons ke teks terstruktur
            elif isinstance(response, dict):  # Jika respons adalah dictionary
                structured_text += clean_agent_output(response.get("text", "")) + "\n\n"  # Mengambil teks dari dictionary
            else:  # Jika respons tidak sesuai dengan tipe yang diharapkan
                structured_text += clean_agent_output(getattr(response, "content", str(response))) + "\n\n"  # Mengambil konten respons

            start_idx += CHUNK_SIZE  # Memperbarui indeks awal untuk chunk berikutnya
            chunk_count += 1  # Meningkatkan penghitung chunk

        structured_text = structured_text.strip()  # Menghapus spasi di awal dan akhir teks terstruktur
        chunked_texts = structured_text.split("\n\n")  # Memisahkan teks terstruktur menjadi chunk berdasarkan dua newline

        # Gabungkan chunk yang terlalu pendek
        optimized_chunks = []  # Inisialisasi daftar untuk menyimpan chunk yang dioptimalkan
        temp_chunk = ""  # Inisialisasi string untuk menyimpan chunk sementara

        for chunk in chunked_texts:  # Iterasi melalui setiap chunk
            chunk = chunk.strip()  # Menghapus spasi di awal dan akhir chunk
            if len(chunk) < MIN_CHUNK_SIZE:  # Jika chunk lebih kecil dari ukuran minimum
                temp_chunk += " " + chunk  # Tambahkan chunk ke chunk sementara
            else:  # Jika chunk memenuhi ukuran minimum
                if temp_chunk:  # Jika ada chunk sementara yang tersimpan
                    optimized_chunks.append(temp_chunk.strip())  # Tambahkan chunk sementara ke daftar chunk yang dioptimalkan
                    temp_chunk = ""  # Reset chunk sementara
                optimized_chunks.append(chunk)  # Tambahkan chunk yang valid ke daftar

        if temp_chunk:  # Jika ada chunk sementara yang tersisa
            optimized_chunks.append(temp_chunk.strip())  # Tambahkan chunk terakhir ke daftar

        chunk_data = [{"chunk_id": i+1, "text": chunk.strip()}  # Membuat daftar chunk dengan ID dan teks
                      for i, chunk in enumerate(optimized_chunks[:MAX_CHUNKS]) if chunk.strip()]  # Hanya menyertakan chunk yang tidak kosong

        metadata = {  # Menyusun metadata untuk dokumen
            "filename": filename,  # Menyimpan nama file
            "total_chunks": len(chunk_data),  # Menyimpan jumlah total chunk
            "total_length": len(plain_text)  # Menyimpan panjang total teks
        }

        extracted_docs.extend([  # Menambahkan dokumen yang diekstrak ke daftar
            Document(page_content=chunk["text"], metadata={"chunk_id": chunk["chunk_id"], **metadata}) 
            for chunk in chunk_data
        ])

        # Simpan hasil chunking ke file
        chunked_filepath = os.path.join(CHUNKED_DATA_PATH, f"chunked_{filename}.txt")  # Menentukan jalur untuk file chunked
        with open(chunked_filepath, "w", encoding="utf-8") as chunked_file:  # Membuka file untuk menulis hasil chunking
            for chunk in chunk_data:  # Iterasi melalui setiap chunk
                chunked_file.write(f"Chunk {chunk['chunk_id']}:\n")  # Menulis ID chunk
                chunked_file.write(f"{chunk['text']}\n")  # Menulis teks chunk
                chunked_file.write("\n---\n\n")  # Menambahkan pemisah antar chunk

        metadata_filepath = os.path.join(METADATA_PATH, f"metadata_{filename}.json")  # Menentukan jalur untuk file metadata
        with open(metadata_filepath, "w", encoding="utf-8") as metadata_file:  # Membuka file untuk menulis metadata
            json.dump(metadata, metadata_file, indent=4)  # Menyimpan metadata dalam format JSON

        print(f"[INFO] Total chunks generated for {filename}: {len(chunk_data)}")  # Menampilkan informasi jumlah chunk yang dihasilkan

    except Exception as e:  # Menangkap kesalahan yang mungkin terjadi selama pemrosesan
        print(f"[ERROR] Error processing {filename}: {e}")  # Menampilkan pesan kesalahan

6. Fungsi menyimpan hasil pemrosesan

In [None]:
# Simpan ke FAISS hanya jika ada dokumen yang diproses
if extracted_docs:  # Jika ada dokumen yang berhasil diproses
    vector_store = FAISS.from_documents(extracted_docs, embedding_model)  # Membuat penyimpanan vektor dari dokumen yang diekstrak
    vector_store.save_local(INDEX_PATH)  # Menyimpan penyimpanan vektor secara lokal
    print("[SUCCESS] Proses chunking selesai. Hasilnya disimpan dalam 'chunked_data' dan metadata di 'metadata'.")  # Menampilkan pesan sukses
else:  # Jika tidak ada dokumen yang berhasil diproses
    print("[INFO] Tidak ada dokumen yang berhasil diproses.")  # Menampilkan informasi bahwa tidak ada dokumen yang diproses

7. Keseluruhan Kode Program

In [8]:
import os  # Mengimpor modul os untuk operasi sistem file
import json  # Mengimpor modul json untuk pengelolaan data JSON
import re  # Mengimpor modul re untuk ekspresi reguler
from IPython.display import Markdown  # Mengimpor Markdown untuk menampilkan teks dalam format Markdown di Jupyter Notebook
from phi.agent import Agent  # Mengimpor Agent untuk penggunaan model LLM
from phi.model.ollama import Ollama  # Mengimpor Ollama untuk penggunaan model LLM
from langchain_huggingface import HuggingFaceEmbeddings  # Mengimpor model embedding dari Hugging Face
from langchain.vectorstores import FAISS  # Mengimpor FAISS untuk pengelolaan vektor
from langchain_core.documents import Document  # Mengimpor Document untuk pengelolaan dokumen
from utils.document_processor import DocumentProcessor  # Mengimpor DocumentProcessor untuk pengelolaan dokumen

# Inisialisasi path dan model
DATA_PATH = "./data"  # Menentukan folder data
INDEX_PATH = "faiss_index"  # Menentukan jalur untuk indeks FAISS
CHUNKED_DATA_PATH = "./output_chunks"  # Menentukan folder untuk hasil chunking
METADATA_PATH = "./metadata"  # Menentukan folder untuk metadata
OLLAMA_MODEL = "llama3.2"  # Menentukan model LLM yang akan digunakan

# Pastikan folder tersedia
os.makedirs(CHUNKED_DATA_PATH, exist_ok=True)  # Membuat folder untuk hasil chunking jika belum ada
os.makedirs(METADATA_PATH, exist_ok=True)  # Membuat folder untuk metadata jika belum ada

# Inisialisasi model Ollama
llm = Ollama(id=OLLAMA_MODEL)  # Menginisialisasi model LLM

# Inisialisasi embedding model
embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")  # Menginisialisasi model embedding

# Inisialisasi document processor
docs = DocumentProcessor()  # Menginisialisasi DocumentProcessor

# Inisialisasi Agent dengan model Ollama
agent = Agent(model=llm, show_tool_calls=True, markdown=True)  # Menginisialisasi Agent dengan model LLM

# Parameter chunking
CHUNK_SIZE = 1200  # Menentukan ukuran chunk
MIN_CHUNK_SIZE = 500  # Menentukan ukuran chunk minimum
MAX_CHUNKS = 30  # Menentukan jumlah chunk maksimum

extracted_docs = []  # Inisialisasi daftar untuk menyimpan dokumen yang telah diproses

def clean_text(text):
    """ Membersihkan teks dari karakter kosong, whitespace berlebih, dan simbol aneh """
    text = re.sub(r'\s+', ' ', text).strip()  # Mengganti spasi berlebih dengan satu spasi dan menghapus spasi di awal/akhir
    text = re.sub(r'[^\x00-\x7F]+', ' ', text)  # Mengganti karakter yang tidak dapat diwakili dengan spasi
    return text  # Mengembalikan teks yang telah dibersihkan

def clean_agent_output(text):
    """ Membersihkan output dari agent agar tidak mengandung pemisah yang tidak perlu """
    text = re.sub(r'\n?###.*?\n', '\n', text)  # Menghapus judul markdown seperti ### Section
    text = re.sub(r'\n?\*\*\*.*?\n', '\n', text)  # Menghapus pemisah ***
    text = re.sub(r'\n?-{3,}\n?', '\n', text)  # Menghapus garis pemisah ---
    text = re.sub(r'(\s*-{2,}\s*)', ' ', text)  # Menghapus pemisah --
    text = re.sub(r'(\s*\*{2,}\s*)', ' ', text)  # Menghapus ** pemisah tebal
    text = re.sub(r'(\s*\*\s*)', ' ', text)  # Menghapus * pemisah tunggal
    text = re.sub(r'(\s*-\s*)', ' ', text)  # Menghapus pemisah -
    text = re.sub(r'\n{2,}', '\n\n', text).strip()  # Mengganti newline berlebih dengan dua newline
    return text  # Mengembalikan teks yang telah dibersihkan

for filename in os.listdir(DATA_PATH):  # Iterasi melalui setiap file dalam folder data
    valid_extensions = ('.pdf', '.docx', '.txt')  # Menentukan ekstensi file yang valid
    if not filename.lower().endswith(valid_extensions):  # Jika file tidak memiliki ek stensi yang valid, lanjutkan ke file berikutnya
        continue  # Melewati file yang tidak valid

    filepath = os.path.join(DATA_PATH, filename)  # Menentukan jalur lengkap untuk file

    try:
        with open(filepath, "rb") as f:  # Membuka file dalam mode baca biner
            document = f.read()  # Membaca isi file
            result = docs.process_document(document, filename)  # Memproses dokumen menggunakan DocumentProcessor

        if not result or len(result) < 4:  # Memeriksa apakah hasil pemrosesan valid
            print(f"[WARNING] Gagal memproses {filename}, melewati file ini.")  # Menampilkan peringatan jika pemrosesan gagal
            continue  # Melewati file yang gagal diproses

        plain_text = clean_text(result[3])  # Mengambil dan membersihkan teks dari hasil pemrosesan
        print(f"[INFO] {filename} - Panjang teks sebelum pemrosesan: {len(plain_text)}")  # Menampilkan panjang teks yang dibaca

        # Gunakan agent untuk chunking secara bertahap
        structured_text = ""  # Inisialisasi string untuk menyimpan teks terstruktur
        start_idx = 0  # Inisialisasi indeks awal untuk chunking
        chunk_count = 0  # Inisialisasi penghitung chunk

        while start_idx < len(plain_text) and chunk_count < MAX_CHUNKS:  # Selama ada teks yang tersisa dan jumlah chunk belum mencapai maksimum
            chunk_text = plain_text[start_idx:start_idx + CHUNK_SIZE]  # Mengambil chunk teks
            response = agent.run(  # Menjalankan agent untuk memproses chunk
                f"Split the following text into meaningful segments ensuring logical separation:\n{chunk_text}",
                max_tokens=8000  # Menentukan batas token untuk respons
            )
            
            if isinstance(response, str):  # Jika respons adalah string
                structured_text += clean_agent_output(response) + "\n\n"  # Membersihkan dan menambahkan respons ke teks terstruktur
            elif isinstance(response, dict):  # Jika respons adalah dictionary
                structured_text += clean_agent_output(response.get("text", "")) + "\n\n"  # Mengambil teks dari dictionary
            else:  # Jika respons tidak sesuai dengan tipe yang diharapkan
                structured_text += clean_agent_output(getattr(response, "content", str(response))) + "\n\n"  # Mengambil konten respons

            start_idx += CHUNK_SIZE  # Memperbarui indeks awal untuk chunk berikutnya
            chunk_count += 1  # Meningkatkan penghitung chunk

        structured_text = structured_text.strip()  # Menghapus spasi di awal dan akhir teks terstruktur
        chunked_texts = structured_text.split("\n\n")  # Memisahkan teks terstruktur menjadi chunk berdasarkan dua newline

        # Gabungkan chunk yang terlalu pendek
        optimized_chunks = []  # Inisialisasi daftar untuk menyimpan chunk yang dioptimalkan
        temp_chunk = ""  # Inisialisasi string untuk menyimpan chunk sementara

        for chunk in chunked_texts:  # Iterasi melalui setiap chunk
            chunk = chunk.strip()  # Menghapus spasi di awal dan akhir chunk
            if len(chunk) < MIN_CHUNK_SIZE:  # Jika chunk lebih kecil dari ukuran minimum
                temp_chunk += " " + chunk  # Tambahkan chunk ke chunk sementara
            else:  # Jika chunk memenuhi ukuran minimum
                if temp_chunk:  # Jika ada chunk sementara yang tersimpan
                    optimized_chunks.append(temp_chunk.strip())  # Tambahkan chunk sementara ke daftar chunk yang dioptimalkan
                    temp_chunk = ""  # Reset chunk sementara
                optimized_chunks.append(chunk)  # Tambahkan chunk yang valid ke daftar

        if temp_chunk:  # Jika ada chunk sementara yang tersisa
            optimized_chunks.append(temp_chunk.strip())  # Tambahkan chunk terakhir ke daftar

        chunk_data = [{"chunk_id": i+1, "text": chunk.strip()}  # Membuat daftar chunk dengan ID dan teks
                      for i, chunk in enumerate(optimized_chunks[:MAX_CHUNKS]) if chunk.strip()]  # Hanya menyertakan chunk yang tidak kosong

        metadata = {  # Menyusun metadata untuk dokumen
            "filename": filename,  # Menyimpan nama file
            "total_chunks": len(chunk_data),  # Menyimpan jumlah total chunk
            "total_length": len(plain_text)  # Menyimpan panjang total teks
        }

        extracted_docs.extend([  # Menambahkan dokumen yang diekstrak ke daftar
            Document(page_content=chunk["text"], metadata={"chunk_id": chunk["chunk_id"], **metadata}) 
            for chunk in chunk_data
        ])

        # Simpan hasil chunking ke file
        chunked_filepath = os.path.join(CHUNKED_DATA_PATH, f"chunked_{filename}.txt")  # Menentukan jalur untuk file chunked
        with open(chunked_filepath, "w", encoding="utf-8") as chunked_file:  # Membuka file untuk menulis hasil chunking
            for chunk in chunk_data:  # Iterasi melalui setiap chunk
                chunked_file.write(f"Chunk {chunk['chunk_id']}:\n")  # Menulis ID chunk
                chunked_file.write(f"{chunk['text']}\n")  # Menulis teks chunk
                chunked_file.write("\n---\n\n")  # Menambahkan pemisah antar chunk

        metadata_filepath = os.path.join(METADATA_PATH, f"metadata_{filename}.json")  # Menentukan jalur untuk file metadata
        with open(metadata_filepath, "w", encoding="utf-8") as metadata_file:  # Membuka file untuk menulis metadata
            json.dump(metadata, metadata_file, indent=4)  # Menyimpan metadata dalam format JSON

        print(f"[INFO] Total chunks generated for {filename}: {len(chunk_data)}")  # Menampilkan informasi jumlah chunk yang dihasilkan

    except Exception as e:  # Menangkap kesalahan yang mungkin terjadi selama pemrosesan
        print(f"[ERROR] Error processing {filename}: {e}")  # Menampilkan pesan kesalahan

# Simpan ke FAISS hanya jika ada dokumen yang diproses
if extracted_docs:  # Jika ada dokumen yang berhasil diproses
    vector_store = FAISS.from_documents(extracted_docs, embedding_model)  # Membuat penyimpanan vektor dari dokumen yang diekstrak
    vector_store.save_local(INDEX_PATH)  # Menyimpan penyimpanan vektor secara lokal
    print("[SUCCESS] Proses chunking selesai. Hasilnya disimpan dalam 'chunked_data' dan metadata di 'metadata'.")  # Menampilkan pesan sukses
else:  # Jika tidak ada dokumen yang berhasil diproses
    print("[INFO] Tidak ada dokumen yang berhasil diproses.")  # Menampilkan informasi bahwa tidak ada dokumen yang diproses

[INFO] dokumen_pdf.pdf - Panjang teks sebelum pemrosesan: 3843
[INFO] Total chunks generated for dokumen_pdf.pdf: 4
[INFO] tko.pdf - Panjang teks sebelum pemrosesan: 8838
[INFO] Total chunks generated for tko.pdf: 7
[SUCCESS] Proses chunking selesai. Hasilnya disimpan dalam 'chunked_data' dan metadata di 'metadata'.


### PENGETESAN DENGAN MODEL

1. Import Library

In [None]:
import os  # Mengimpor modul os untuk operasi sistem file
from IPython.display import display, Markdown, HTML  # Mengimpor fungsi untuk menampilkan output di Jupyter Notebook
from langchain_core.prompts import ChatPromptTemplate  # Mengimpor ChatPromptTemplate untuk membuat template prompt
from langchain_ollama.llms import OllamaLLM  # Mengimpor OllamaLLM untuk menggunakan model LLM

2. Fungsi read_chunks_ordered

In [None]:
def read_chunks_ordered(path):
    """Membaca file chunks dengan urutan yang benar"""
    files = []  # Inisialisasi daftar untuk menyimpan nama file
    for file in os.listdir(path):  # Iterasi melalui setiap file dalam folder
        if file.endswith('.txt') and file.startswith('chunk_'):  # Memeriksa apakah file adalah chunk
            files.append(file)  # Menambahkan file ke daftar jika memenuhi syarat
    
    # Sort files naturally (chunk_1.txt, chunk_2.txt, etc.)
    files.sort(key=lambda x: int(x.split('_')[1].split('.')[0]))  # Mengurutkan file berdasarkan nomor chunk

    all_text = []  # Inisialisasi daftar untuk menyimpan konten dari semua chunk
    for file in files:  # Iterasi melalui setiap file yang telah diurutkan
        file_path = os.path.join(path, file)  # Menentukan jalur lengkap untuk file
        try:
            with open(file_path, 'r', encoding='utf-8') as f:  # Membuka file dalam mode baca
                content = f.read()  # Membaca isi file
                all_text.append(content)  # Menambahkan konten ke daftar
                display(HTML(f"✅ Read {file}: {len(content)} characters"))  # Menampilkan pesan sukses
        except Exception as e:
            display(HTML(f"❌ Error reading {file}: {str(e)}"))  # Menampilkan pesan kesalahan jika terjadi error
    
    return "\n\n".join(all_text)  # Menggabungkan semua isi file dengan pemisah newline

3. Fungsi process_document

In [None]:
def process_document(chunk_folder):
    """Fungsi utama dengan parameter folder"""
    try:
        # Baca dokumen
        display(Markdown("### 1. Membaca dokumen..."))  # Menampilkan pesan bahwa dokumen sedang dibaca
        if not os.path.exists(chunk_folder):  # Memeriksa apakah folder ada
            raise FileNotFoundError(f"Folder tidak ditemukan: {chunk_folder}")  # Mengangkat kesalahan jika folder tidak ditemukan
        
        document_text = read_chunks_ordered(chunk_folder)  # Membaca semua chunk dari folder
        
        # Initialize model dengan temperature 0 untuk hasil konsisten
        display(Markdown("### 2. Inisialisasi model..."))  # Menampilkan pesan bahwa model sedang diinisialisasi
        model = OllamaLLM(  # Menginisialisasi model LLM
            model="llama3.2",  # Menentukan model yang digunakan
            temperature=0.0,  # Set ke 0 untuk hasil yang konsisten
            num_ctx=4096      # Konteks yang lebih besar
        )
        
        # Prompt yang lebih spesifik
        display(Markdown("### 3. Memproses dokumen..."))  # Menampilkan pesan bahwa dokumen sedang diproses
        template = """
        Anda adalah asisten yang SANGAT TELITI. Baca dokumen berikut dengan seksama dan jawab pertanyaan-pertanyaan berdasarkan HANYA informasi yang ada dalam dokumen.
        
        ATURAN PENTING:
        1. Gunakan HANYA informasi dari dokumen
        2. Jika informasi tidak ada, tulis "Informasi tidak ditemukan dalam dokumen"
        3. Jika informasi ditemukan, KUTIP LANGSUNG dari dokumen
        4. JANGAN menambahkan interpretasi atau asumsi
        
        DOKUMEN:
        {document}

        PERTANYAAN DAN JAWABAN:

        **1. Apa tujuan dari tata kerja organisasi?**
        [Kutip PERSIS dari dokumen atau tulis "Informasi tidak ditemukan dalam dokumen"]

        **2. Apa pengertian Onsite Support dan Aplikasi Upstream?**
        [Kutip PERSIS dari dokumen atau tulis "Informasi tidak ditemukan dalam dokumen"]

        **3. Apa dokumen dan referensi terkait Tata Kerja Organisasi?**
        [Kutip PERSIS dari dokumen atau tulis "Informasi tidak ditemukan dalam dokumen"]

        **4. Bagaimana prosedur permintaan baru akun aplikasi?**
        [Kutip PERSIS dari dokumen atau tulis "Informasi tidak ditemukan dalam dokumen"]
        """
        
        prompt = ChatPromptTemplate.from_template(template)  # Membuat template prompt dari string
        chain = prompt | model  # Menggabungkan prompt dengan model LLM
        
        # Proses dokumen
        response = chain.invoke({"document": document_text})  # Mengirimkan teks dokumen ke model untuk diproses
        
        # Tampilkan hasil
        display(Markdown("### 4. Hasil:"))  # Menampilkan judul untuk hasil
        display(Markdown(response))  # Menampilkan hasil dari model
        
    except Exception as e:
        display(HTML(f"<b style='color: red;'>Error:</b> {str(e)}"))  # Menampilkan pesan kesalahan jika terjadi error

4. Ekseskusi Program

In [None]:
folder_path = "output_chunks/overlap"  # Menentukan jalur folder yang berisi chunk
process_document(folder_path)  # Memanggil fungsi untuk memproses dokumen

5. Keseluruhan Kode Program

In [9]:
import os  # Mengimpor modul os untuk operasi sistem file
from IPython.display import display, Markdown, HTML  # Mengimpor fungsi untuk menampilkan output di Jupyter Notebook
from langchain_core.prompts import ChatPromptTemplate  # Mengimpor ChatPromptTemplate untuk membuat template prompt
from langchain_ollama.llms import OllamaLLM  # Mengimpor OllamaLLM untuk menggunakan model LLM

def read_chunks_ordered(path):
    """Membaca file chunks dengan urutan yang benar"""
    files = []  # Inisialisasi daftar untuk menyimpan nama file
    for file in os.listdir(path):  # Iterasi melalui setiap file dalam folder
        if file.endswith('.txt') and file.startswith('chunk_'):  # Memeriksa apakah file adalah chunk
            files.append(file)  # Menambahkan file ke daftar jika memenuhi syarat
    
    # Sort files naturally (chunk_1.txt, chunk_2.txt, etc.)
    files.sort(key=lambda x: int(x.split('_')[1].split('.')[0]))  # Mengurutkan file berdasarkan nomor chunk

    all_text = []  # Inisialisasi daftar untuk menyimpan konten dari semua chunk
    for file in files:  # Iterasi melalui setiap file yang telah diurutkan
        file_path = os.path.join(path, file)  # Menentukan jalur lengkap untuk file
        try:
            with open(file_path, 'r', encoding='utf-8') as f:  # Membuka file dalam mode baca
                content = f.read()  # Membaca isi file
                all_text.append(content)  # Menambahkan konten ke daftar
                display(HTML(f"✅ Read {file}: {len(content)} characters"))  # Menampilkan pesan sukses
        except Exception as e:
            display(HTML(f"❌ Error reading {file}: {str(e)}"))  # Menampilkan pesan kesalahan jika terjadi error
    
    return "\n\n".join(all_text)  # Menggabungkan semua isi file dengan pemisah newline

def process_document(chunk_folder):
    """Fungsi utama dengan parameter folder"""
    try:
        # Baca dokumen
        display(Markdown("### 1. Membaca dokumen..."))  # Menampilkan pesan bahwa dokumen sedang dibaca
        if not os.path.exists(chunk_folder):  # Memeriksa apakah folder ada
            raise FileNotFoundError(f"Folder tidak ditemukan: {chunk_folder}")  # Mengangkat kesalahan jika folder tidak ditemukan
        
        document_text = read_chunks_ordered(chunk_folder)  # Membaca semua chunk dari folder
        
        # Initialize model dengan temperature 0 untuk hasil konsisten
        display(Markdown("### 2. Inisialisasi model..."))  # Menampilkan pesan bahwa model sedang diinisialisasi
        model = OllamaLLM(  # Menginisialisasi model LLM
            model="llama3.2",  # Menentukan model yang digunakan
            temperature=0.0,  # Set ke 0 untuk hasil yang konsisten
            num_ctx=4096      # Konteks yang lebih besar
        )
        
        # Prompt yang lebih spesifik
        display(Markdown("### 3. Memproses dokumen..."))  # Menampilkan pesan bahwa dokumen sedang diproses
        template = """
        Anda adalah asisten yang SANGAT TELITI. Baca dokumen berikut dengan seksama dan jawab pertanyaan-pertanyaan berdasarkan HANYA informasi yang ada dalam dokumen.
        
        ATURAN PENTING:
        1. Gunakan HANYA informasi dari dokumen
        2. Jika informasi tidak ada, tulis "Informasi tidak ditemukan dalam dokumen"
        3. Jika informasi ditemukan, KUTIP LANGSUNG dari dokumen
        4. JANGAN menambahkan interpretasi atau asumsi
        
        DOKUMEN:
        {document}

        PERTANYAAN DAN JAWABAN:

        **1. Apa tujuan dari tata kerja organisasi?**
        [Kutip PERSIS dari dokumen atau tulis "Informasi tidak ditemukan dalam dokumen"]

        **2. Apa pengertian Onsite Support dan Aplikasi Upstream?**
        [Kutip PERSIS dari dokumen atau tulis "Informasi tidak ditemukan dalam dokumen"]

        **3. Apa dokumen dan referensi terkait Tata Kerja Organisasi?**
        [Kutip PERSIS dari dokumen atau tulis "Informasi tidak ditemukan dalam dokumen"]

        **4. Bagaimana prosedur permintaan baru akun aplikasi?**
        [Kutip PERSIS dari dokumen atau tulis "Informasi tidak ditemukan dalam dokumen"]
        """
        
        prompt = ChatPromptTemplate.from_template(template)  # Membuat template prompt dari string
        chain = prompt | model  # Menggabungkan prompt dengan model LLM
        
        # Proses dokumen
        response = chain.invoke({"document": document_text})  # Mengirimkan teks dokumen ke model untuk diproses
        
        # Tampilkan hasil
        display(Markdown("### 4. Hasil:"))  # Menampilkan judul untuk hasil
        display(Markdown(response))  # Menampilkan hasil dari model
        
    except Exception as e:
        display(HTML(f"<b style='color: red;'>Error:</b> {str(e)}"))  # Menampilkan pesan kesalahan jika terjadi error

# Jalankan dengan folder yang spesifik
folder_path = "output_chunks/overlap"  # Menentukan jalur folder yang berisi chunk
process_document(folder_path)  # Memanggil fungsi untuk memproses dokumen

### 1. Membaca dokumen...

### 2. Inisialisasi model...

### 3. Memproses dokumen...

### 4. Hasil:

Berikut adalah jawaban untuk pertanyaan-pertanyaan tersebut:

**1. Apa tujuan dari tata kerja organisasi?**

Tujuan dari Tata Kerja Organisasi (TKO) ini adalah untuk menjelaskan proses pengelolaan akun-akun yang memerlukan akses ke aplikasi Non-ERP yang meliputi aplikasi Non-ERP bisnis, aplikasi upstream dan infrastruktur pendukung lainnya di lingkungan PT. Pertamina Hulu Rokan – Zona Rokan.

**2. Apa pengertian Onsite Support dan Aplikasi Upstream?**

Pengertian dari Onsite Support adalah personel yang ditunjuk untuk mengontrol akses aplikasi dan/atau data dari suatu aplikasi vendor. Pengertian dari Aplikasi Upstream adalah perangkat lunak yang digunakan oleh Perusahaan untuk mendukung proses geology, geophysics, reservoir, production, facility, drilling, engineering (GGRPFDE).

**3. Apa dokumen dan referensi terkait Tata Kerja Organisasi?**

Dokumen dan referensi terkait Tata Kerja Organisasi adalah Undang-Undang Republik Indonesia No. 11 Tahun 2008 tentang Informasi dan Transaksi Elektronik, Pedoman PT Pertamina Hulu Energi No. A10-001/PHE53000/2021-S9 Rev. 0 tentang Penyelenggaraan Tata Kelola Teknologi Informasi Perusahaan, Pedoman PT Pertamina Hulu Energi No. A10-002/PHE53000/2021-S9 Rev. 0 tentang Pedoman Perencanaan Teknologi Informasi, dan Pedoman SKK Migas No. KEP-00008./SKO0000/2013/S0 tentang Tata Kerja Pengelolaan Teknologi Informasi dan Komunikasi Pada kontraktor Kontrak Kerja Sama.

**4. Bagaimana prosedur permintaan baru akun aplikasi?**

Prosedur permintaan baru akun aplikasi melalui proses User Access Review adalah sebagai berikut:

a. Permintaan Baru Akun Aplikasi
- Application Programmer membuat daftar akun yang berisikan kepemilikan akun dan hak akses yang sudah terdaftar untuk masing-masing aplikasi.
- Technical Writer mengirimkan daftar akun tersebut ke Information Steward melalui email dengan format email "User Access Review" berupa peninjauan daftar akun untuk akun yang perlu diperbarui dari aplikasi.

b. Peninjauan Daftar Akun
- Information Steward meninjau daftar akun aplikasi yang diterima kemudian mengirimkan kembali ke Technical Writer email UAR hasil tinjauan berupa daftar akun baru yang perlu ditambahkan akses ke aplikasi.

c. Persetujuan Daftar Hasil Peninjauan Akun
- Technical Writer menginfokan hasil peninjauan akun aplikasi yang sudah selesai dilakukan oleh Information Steward ke Business Analyst, untuk kemudian meminta persetujuan dari daftar hasil peninjauan akun yang telah selesai ditinjau oleh Information Steward.

d. Penyetujuan Daftar Hasil Peninjauan Akun
- Business Analyst menyetujui daftar hasil peninjauan akun dan meminta Technical Writer untuk membuat tiket terkait akun-akun yang perlu ditambahkan aksesnya ke aplikasi.

e. Pengecekan Terhadap Ketersediaan Lisensi
- Technical Writer membuat tiket untuk pengecekan terhadap ketersediaan lisensi kepada Onsite Support.
- Onsite Support melakukan pengecekan terhadap ketersediaan lisensi dan memberitahu ke Technical Writer jika lisensi tidak tersedia.

f. Penambahan Akses Akun Baru
- Jika aplikasi tersebut memerlukan akses database, maka lanjut ke proses 1j.
- Jika aplikasi tersebut tidak memerlukan akses database, maka lanjut ke proses 1k.
- IT IO – Database Administrator melakukan penambahan akun baru pada database aplikasi yang diminta dan memberitahu penambahan akses akun baru ke Technical Writer.

g. Penambahan Akses Akun Baru
- Application Programmer memberikan akses ke aplikasi yang diminta dan memberitahu penambahan akses akun baru ke Technical Writer.

h. Penutupan Tiket Permintaan Penambahan Akun Baru
- Technical Writer menutup tiket permintaan penambahan akun baru setelah mendapat konfirmasi dari masing-masing pemilik Akun (Pengguna).