In [70]:
from typing import List
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from transformers import AutoTokenizer
from sentence_transformers import SentenceTransformer
import torch
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import classification_report, accuracy_score, roc_auc_score
import re
import fitz
import spacy

In [71]:
print("GPU Available:", torch.cuda.is_available())
print("TensorFlow GPU:", tf.config.list_physical_devices('GPU'))

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

os.environ['SPACY_DATA'] = r'C:\Users\Salsa\PycharmProjects\pytorch\.venv\share\spacy_data'

GPU Available: False
TensorFlow GPU: []


In [75]:
class TextProcessorWithPyMuPDF:
    def __init__(self):
        self.model_name = "sentence-transformers/all-MiniLM-L6-v2"
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.transformer = SentenceTransformer(self.model_name)
    
    def extract_from_pdf(self, pdf_path):
        text = ""
        with fitz.open(pdf_path) as doc:
            for page in doc:
                text += page.get_text() + "\n"
        return text

    def clean_text(self, text):
        text = re.sub(r'\s+', ' ', text)  # Remove excessive whitespace
        text = re.sub(r'\n+', '\n', text)
        text = re.sub(r'[^\w\s.,:]', '', text)
        text = re.sub(r'[^\w\s\-\'àáâãäåèéêëìíîïòóôõöùúûüýÿÀÁÂÃÄÅÈÉÊËÌÍÎÏÒÓÔÕÖÙÚÛÜÝ]', '', text)
        return text.strip()
    
    def hapus_duplikat(self, text: str) -> str:
        seen = set()
        result_text = []
        lines = text.split("\n")
        for line in lines:
            cleaned_line = line.strip()
            if cleaned_line and cleaned_line not in seen:
                seen.add(cleaned_line)
                result_text.append(cleaned_line)
        return "\n".join(result_text)
    
    def cut_isi(self, text: str) -> str:
        pola_2dapus = re.compile(r'(?<=daftar pustaka)(.*?)(?=daftar pustaka)', re.IGNORECASE)
        match = pola_2dapus.search(text)
        if match:
            hasil = match.group(1)
            return hasil
        else:
            pola_1dapus = re.compile(r'(.*?)(daftar pustaka)', re.IGNORECASE)
            cek_1dapus = pola_1dapus.search(text)
            if cek_1dapus:
                hasil = cek_1dapus.group(1)
                return hasil
            else:
                return "Tidak ditemukan kata 'daftar pustaka' sama sekali."

    def cut_daftar(self, text: str) -> str:
        pola_titik = re.compile(r'\.{10,}', re.DOTALL)
        matches = list(pola_titik.finditer(text))
        if matches:
            last_match = matches[-1]
            last_match_end = last_match.end()
            text = text[last_match_end:].strip()
        return text

    def segment_text(self, text, max_length=512):
        segments = re.split(r'(?<=[.!?])\s+', text)
        processed_segments, current_segment = [], ""
        for segment in segments:
            if len(current_segment) + len(segment) < max_length:
                current_segment += " " + segment if current_segment else segment
            else:
                processed_segments.append(current_segment)
                current_segment = segment
        if current_segment:
            processed_segments.append(current_segment)
        return processed_segments
    
    def generate_embeddings(self, texts):
        return self.transformer.encode(texts, convert_to_tensor=True)


In [77]:
pdf_dir = "Perkategori/"  # Directory containing PDF files
processor = TextProcessorWithPyMuPDF()

all_segments = []
for pdf_file in os.listdir(pdf_dir):
    if pdf_file.endswith(".pdf"):
        pdf_path = os.path.join(pdf_dir, pdf_file)
        raw_text = processor.extract_from_pdf(pdf_path)
        cleaned_text = processor.clean_text(raw_text)
        cleaned_text = processor.hapus_duplikat(cleaned_text)
        cleaned_text = processor.cut_isi(cleaned_text)
        cleaned_text = processor.cut_daftar(cleaned_text)
        segments = processor.segment_text(cleaned_text)
        all_segments.extend(segments)  # This will append all the segments to the list

print(f"Total segments processed: {len(all_segments)}")

# Generate Embeddings
embeddings = processor.generate_embeddings(all_segments)
embeddings_np = embeddings.cpu().numpy() if torch.cuda.is_available() else embeddings.numpy()

Total segments processed: 66


In [78]:
half_dim = embeddings_np.shape[1] // 2
text_data = embeddings_np[:, :half_dim]
explanation_data = embeddings_np[:, half_dim:]

input_data = {
    "text_input": text_data,
    "explanation_input": explanation_data
}

In [79]:
# Simulated Labels
num_samples = embeddings_np.shape[0]
understanding_labels = np.random.randint(0, 3, (num_samples, 3))  # One-hot encoded
completeness_labels = np.random.randint(0, 2, (num_samples,))    # Binary

output_data = {
    "understanding": understanding_labels,
    "completeness": completeness_labels
}

In [80]:
def create_tf_model(embedding_dim=384):
    text_input = tf.keras.layers.Input(shape=(embedding_dim // 2,), name='text_input')
    explanation_input = tf.keras.layers.Input(shape=(embedding_dim // 2,), name='explanation_input')
    concatenated = tf.keras.layers.Concatenate()([text_input, explanation_input])
    
    x = tf.keras.layers.Dense(512, activation='relu')(concatenated)
    x = tf.keras.layers.Dropout(0.2)(x)
    x = tf.keras.layers.Dense(256, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.2)(x)
    
    understanding_score = tf.keras.layers.Dense(3, activation='softmax', name='understanding')(x)
    completeness_score = tf.keras.layers.Dense(1, activation='sigmoid', name='completeness')(x)
    
    model = tf.keras.Model(
        inputs=[text_input, explanation_input],
        outputs=[understanding_score, completeness_score]
    )
    model.compile(
        optimizer='adam',
        loss={
            'understanding': 'categorical_crossentropy',
            'completeness': 'binary_crossentropy'
        },
        metrics={
            'understanding': 'accuracy',
            'completeness': 'accuracy'
        }
    )
    return model

# Initialize Model
tf_model = create_tf_model()
tf_model.summary()

Model: "model_4"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 text_input (InputLayer)     [(None, 192)]                0         []                            
                                                                                                  
 explanation_input (InputLa  [(None, 192)]                0         []                            
 yer)                                                                                             
                                                                                                  
 concatenate_4 (Concatenate  (None, 384)                  0         ['text_input[0][0]',          
 )                                                                   'explanation_input[0][0]']   
                                                                                            

In [81]:
history = tf_model.fit(
    input_data,
    output_data,
    validation_split=0.2,
    epochs=10,
    batch_size=32,
    verbose=1
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [82]:
# Cross-Validation
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
fold_results = []

for train_idx, val_idx in kfold.split(text_data):
    X_train = {"text_input": text_data[train_idx], "explanation_input": explanation_data[train_idx]}
    X_val = {"text_input": text_data[val_idx], "explanation_input": explanation_data[val_idx]}
    
    y_train = {
        "understanding": understanding_labels[train_idx],
        "completeness": completeness_labels[train_idx]
    }
    y_val = {
        "understanding": understanding_labels[val_idx],
        "completeness": completeness_labels[val_idx]
    }
    
    tf_model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=5, batch_size=32, verbose=1)
    predictions = tf_model.predict(X_val)
    understanding_pred = predictions[0].argmax(axis=-1)
    completeness_pred = (predictions[1] > 0.5).astype(int)
    print("Fold complete.")

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Fold complete.
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Fold complete.
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Fold complete.
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Fold complete.
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Fold complete.


In [83]:
# tf_model.save("feynmind_model")