### Mendefinisikan class dan fungsi Text Preprocessing

In [None]:
import logging
from Sastrawi.Stemmer.StemmerFactory import StemmerFactory
import re
import string
from nltk.tokenize import word_tokenize
from Sastrawi.StopWordRemover.StopWordRemoverFactory import StopWordRemoverFactory
from nltk.corpus import stopwords
import pandas as pd

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class TextCleaner:
    def __init__(self):
        self.stemmer = StemmerFactory().create_stemmer()
        stopword_factory = StopWordRemoverFactory()
        self.combined_stopwords = set(stopword_factory.get_stop_words()).union(set(stopwords.words('english')))

    def clean_text(self, text):
        text = re.sub(r'[\"“”]', '', text)
        text = text.translate(str.maketrans('', '', string.punctuation))
        text = re.sub(r'\d+', '', text)
        text = re.sub(r'\s+', ' ', text).strip()
        text = re.sub(r'[^a-zA-Z\s]', '', text)
        return text

    def preprocess_text(self, text) -> str:
        text = self.clean_text(text).lower()
        tokens = word_tokenize(text)
        tokens = [word for word in tokens if word not in self.combined_stopwords]
        stemmed = [self.stemmer.stem(word) for word in tokens]
        processed_text = ' '.join(stemmed)
        return processed_text

class DataPreprocessing:
    def __init__(self, dataframe: pd.DataFrame, columns: list):
        self.dataframe = dataframe
        self.columns = columns
        self.cleaner = TextCleaner()

    def exec(self) -> pd.DataFrame:
        df_copy = self.dataframe.copy()
        for col in self.columns:
            if col in df_copy.columns:
                df_copy[col] = df_copy[col].astype(str).apply(self.cleaner.preprocess_text)
        logging.info('> Preprocessing completed.')
        return df_copy

### Mendefinisikan class dan fungsi TFIDF

In [None]:
import csv
import string
import math
import logging
from collections import Counter
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
import joblib

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class TFIDFVectorizerExporter:
    def __init__(self, input_df: pd.DataFrame, output_file: str, text_column: str, model_output:str):
        if not isinstance(input_df, pd.DataFrame):
            raise TypeError("Input must be a DataFrame.")

        if text_column not in input_df.columns:
            raise ValueError(f"Column '{text_column}' not found in DataFrame.")

        self.input_df = input_df
        self.output_file = output_file
        self.text_column = text_column
        self.model_output = model_output

    def export_tfidf(self):
        try:
            self.input_df[self.text_column] = self.input_df[self.text_column].fillna("").astype(str)

            vectorizer = TfidfVectorizer()
            tfidf_matrix = vectorizer.fit_transform(self.input_df[self.text_column])

            joblib.dump(vectorizer, self.model_output)

            feature_names = vectorizer.get_feature_names_out()
            tfidf_df = pd.DataFrame(tfidf_matrix.toarray(), columns=feature_names)

            tfidf_df.to_csv(self.output_file, index=False)

            logging.info(f"TF-IDF Vectorizer successfully exported to {self.output_file}")

            return tfidf_matrix, tfidf_df

        except ValueError as ve:
            logging.error(f"Error processing data: {ve}")
        except Exception as e:
            logging.error(f"Unexpected error: {e}")


class TFIDFCalculator:
    def __init__(self, preprocessed_df: pd.DataFrame, output_file: str, text_column: str):
        if not isinstance(preprocessed_df, pd.DataFrame):
            raise TypeError("Input must be a DataFrame.")

        if text_column not in preprocessed_df.columns:
            raise ValueError(f"Column '{text_column}' not found in DataFrame.")

        self.preprocessed_df = preprocessed_df
        self.output_file = output_file
        self.text_column = text_column
        self.documents = []
        self.terms = set()

    def load_data(self):
        self.documents = self.preprocessed_df[self.text_column].tolist()
        self.terms = set(" ".join(self.documents).split())
        logging.info("Data loaded successfully.")

    def calculate_tf(self):
        term_frequencies = []
        for doc in self.documents:
            counter = Counter(doc.split())
            total_terms = sum(counter.values())
            tf_raw = {term: counter.get(term, 0) for term in self.terms}
            if total_terms > 0:
                tf_norm = {
                    term: round(count / total_terms, 4) if total_terms != 0 else 0
                    for term, count in tf_raw.items()
                }
            else:
                tf_norm = {term: 0 for term in self.terms}
            term_frequencies.append((tf_raw, tf_norm))
        logging.info("Term frequencies calculated successfully.")
        return term_frequencies

    def calculate_df(self):
        df = {term: 0 for term in self.terms}
        for doc in self.documents:
            for term in set(doc.split()):
                df[term] += 1
        logging.info("Document frequencies calculated successfully.")
        return df

    def calculate_idf(self, df: dict) -> dict:
        num_docs = len(self.documents)
        idf = {
            term: round(math.log((1 + num_docs) / (1 + df[term])) + 1, 4)
            for term in self.terms
        }
        logging.info("Inverse document frequencies calculated successfully.")
        return idf

    def calculate_tfidf(self, term_frequencies, idf):
        tfidf = []
        tfidf_norm = []
        for tf_raw, tf_norm in term_frequencies:
            tfidf_doc = {
                term: round(tf_norm[term] * idf[term], 4) for term in self.terms
            }
            norm = math.sqrt(sum(value**2 for value in tfidf_doc.values()))
            tfidf_norm_doc = (
                {term: round(value / norm, 4) for term, value in tfidf_doc.items()}
                if norm != 0
                else {term: 0 for term in self.terms}
            )
            tfidf.append(tfidf_doc)
            tfidf_norm.append(tfidf_norm_doc)
        logging.info("TF-IDF values calculated successfully.")
        return tfidf, tfidf_norm

    def export_results(self, term_frequencies, df, idf, tfidf, tfidf_norm):
        with open(self.output_file, "w", newline="", encoding="utf-8") as file:
            writer = csv.writer(file)

            header = (
                ["Term"]
                + [f"TF{i + 1}" for i in range(len(self.documents))]
                + [f"TFN{i + 1}" for i in range(len(self.documents))]
                + ["DF", "IDF"]
                + [f"TFIDF{i + 1}" for i in range(len(self.documents))]
                + [f"TFIDFN{i + 1}" for i in range(len(self.documents))]
            )
            writer.writerow(header)

            for term in sorted(self.terms):
                row = [term]
                row += [doc_tf[0][term] for doc_tf in term_frequencies]
                row += [doc_tf[1][term] for doc_tf in term_frequencies]
                row.append(df[term])
                row.append(idf[term])
                row += [doc_tfidf[term] for doc_tfidf in tfidf]
                row += [doc_tfidf_norm[term] for doc_tfidf_norm in tfidf_norm]
                writer.writerow(row)
        logging.info(f"Results successfully exported to {self.output_file}")

    def process(self):
        self.load_data()
        term_frequencies = self.calculate_tf()
        df = self.calculate_df()
        idf = self.calculate_idf(df)
        tfidf, tfidf_norm = self.calculate_tfidf(term_frequencies, idf)
        self.export_results(term_frequencies, df, idf, tfidf, tfidf_norm)
        logging.info("TF-IDF calculation and export process completed.")


### Mendefinisikan class dan fungsi SVM

In [None]:
import pandas as pd
import joblib
from sklearn.preprocessing import LabelEncoder
from sklearn.svm import SVC
import logging
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import csv
from pathlib import Path
import time


# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def prepare_train_model(
    y_train_output,
    dataset: pd.DataFrame,
    dataset_colname: str = 'label'):
    logging.info("===== Starting Model Preparation =====")

    # Load dataset
    documents_sentiment = list(dataset[dataset_colname])
    logging.info("Loaded dataset with %d documents", len(documents_sentiment))

    # Label encoding (convert labels to numbers)
    encoder = LabelEncoder()
    y_train = encoder.fit_transform(documents_sentiment)

    # Save encoder
    encoder_path = y_train_output
    joblib.dump(encoder, f"{encoder_path}/encoder.pkl")
    logging.info("Label encoder saved to %s/encoder.pkl", encoder_path)

    logging.info("Pre-train dataset successfully saved")

    return y_train


def train_model(kernel: str, X_train: pd.DataFrame, y_train: pd.Series, model_output_pathandname: str):
    if kernel not in ['sigmoid', 'linear', 'rbf', 'poly']:
        logging.error("Invalid kernel: %s", kernel)
        raise ValueError("Kernel must be one of: 'sigmoid', 'linear', 'rbf', 'poly'")

    model = SVC(kernel=kernel)
    model.fit(X_train, y_train)

    # Save the trained model
    joblib.dump(model, model_output_pathandname)
    logging.info("Model trained and saved to %s", model_output_pathandname)

def predict(test_text: str, X_trained_pkl, svm_model_pkl):
    logging.info("📌 Memulai proses prediksi...")

    # Pastikan file model tersedia
    if not X_trained_pkl.exists():
        logging.error(f"🚨 Model TF-IDF {X_trained_pkl} tidak ditemukan.")
        return None

    if not svm_model_pkl.exists():
        logging.error(f"🚨 Model SVM {svm_model_pkl} tidak ditemukan.")
        return None

    try:
        # Preprocessing teks input
        preprocessor = TextCleaner()
        preprocessed_text = preprocessor.preprocess_text(test_text)

        # Load TF-IDF model
        logging.info("🔄 Memuat model TF-IDF...")
        tfidf_model = joblib.load(X_trained_pkl)
        X_test = tfidf_model.transform([preprocessed_text])

        # Konversi ke dense array jika masih berbentuk sparse matrix
        if hasattr(X_test, "toarray"):
            X_test = X_test.toarray()

        # Load model SVM
        logging.info(f"🔄 Memuat model SVM dari {svm_model_pkl.stem}...")
        svm_model = joblib.load(svm_model_pkl)

        # Pastikan model memiliki metode predict
        if not hasattr(svm_model, "predict"):
            logging.error("🚨 Model yang dimuat bukan model SVM yang valid.")
            return None

        # Lakukan prediksi
        prediction = svm_model.predict(X_test)
        logging.info(f"✅ Hasil prediksi: {prediction[0]}")

        return prediction[0]

    except Exception as e:
        logging.error(f"🚨 Terjadi kesalahan saat melakukan prediksi: {e}")
        return None


def evaluate_and_export_results(model, X_test, y_test, kernel: str, output_csv_path: str):
    logging.info("📊 Mengevaluasi performa model SVM...")

    start_time = time.time()
    y_pred = model.predict(X_test)
    end_time = time.time()
    inference_time = end_time - start_time

    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average='weighted', zero_division=0)
    recall = recall_score(y_test, y_pred, average='weighted', zero_division=0)
    f1 = f1_score(y_test, y_pred, average='weighted', zero_division=0)

    result = {
        "Kernel": kernel,
        "Accuracy": accuracy,
        "Precision": precision,
        "Recall": recall,
        "F1_Score": f1,
        "Inference_Time_Seconds": inference_time
    }

    file_exists = Path(output_csv_path).exists()

    # Tulis ke CSV (append jika file sudah ada)
    with open(output_csv_path, mode='a', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=result.keys())
        if not file_exists:
            writer.writeheader()
        writer.writerow(result)

    logging.info("📁 Hasil evaluasi diekspor ke %s", output_csv_path)


## Latih model SVM

#### Persiapan data, folder, dan fungsi

In [None]:
import os
import sys
import logging
import pandas as pd
from pathlib import Path
from sklearn.metrics import accuracy_score, classification_report
import joblib
import matplotlib.pyplot as plt
import seaborn as sns
from wordcloud import WordCloud
from sklearn.metrics import accuracy_score, classification_report, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split

import nltk
nltk.download('stopwords')

# Setup logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# ----------------------------------------------------------------
# **Konfigurasi Path**
# ----------------------------------------------------------------
BASE_DIR = Path().resolve()

# Direktori penyimpanan hasil
result_path = BASE_DIR / "result"
preprocessed_path = result_path / "preprocessed"
tfidf_path = result_path / "tfidf"
model_path = result_path / "model"
predict_path = result_path / "predict"

# Buat folder jika belum ada
for path in [result_path, preprocessed_path, tfidf_path, model_path, predict_path]:
    path.mkdir(parents=True, exist_ok=True)

# ----------------------------------------------------------------
# **Fungsi Preprocessing**
# ----------------------------------------------------------------
def preprocessing():
    print('Preprocessing.......')
    df = pd.read_csv(BASE_DIR / "dataset.csv")
    preprocessor = DataPreprocessing(df, ['comment'])
    preprocessed_data = preprocessor.exec()

    # Simpan hasil ke CSV
    preprocessed_file = preprocessed_path / "preprocessed.csv"
    preprocessed_data.to_csv(preprocessed_file, index=False)
    logging.info(f"✅ Preprocessed data disimpan di {preprocessed_file}")
    print('preprocessing done')

# ----------------------------------------------------------------
# **Fungsi Training TF-IDF**
# ----------------------------------------------------------------
def training_tfidf(train_size=0.8):
    print("📌 Memulai training TF-IDF...")

    # Membaca data yang telah dipreproses
    data_preprocessed = pd.read_csv(preprocessed_path / "preprocessed.csv")

    # Split data menjadi training dan testing
    df_train, df_test = train_test_split(
        data_preprocessed, train_size=train_size, random_state=42, shuffle=True
    )

    # Simpan hasil split ke dalam file CSV
    training_file = preprocessed_path / "training.csv"
    testing_file = preprocessed_path / "testing.csv"
    df_train.to_csv(training_file, index=False)
    df_test.to_csv(testing_file, index=False)

    logging.info(f"✅ Data training disimpan di {training_file}")
    logging.info(f"✅ Data testing disimpan di {testing_file}")

    # TF-IDF Vectorizer menggunakan data training
    tfidf_vec_file = tfidf_path / "tfidf_vec.csv"
    X_train_pkl = model_path / "X_trained.pkl"

    tfidf_vec = TFIDFVectorizerExporter(df_train, tfidf_vec_file, "comment", X_train_pkl)
    _, tfidf_df = tfidf_vec.export_tfidf()

    logging.info(f"✅ TF-IDF vectorized data disimpan di {tfidf_vec_file}")

    # TF-IDF Manual Calculation
    tfidf_manual_file = tfidf_path / "tfidf_manual.csv"
    tfidf_manual = TFIDFCalculator(df_train, tfidf_manual_file, "comment")
    tfidf_manual.process()

    logging.info(f"TF-IDF manual calculation disimpan di {tfidf_manual_file}")
    print("✅ Training TF-IDF selesai.")


# ----------------------------------------------------------------
# **Fungsi Training SVM (Dinamically Train All Models)**
# ----------------------------------------------------------------
def training_svm(kernel: str):
    print("📌 Memulai training SVM...")

    # Membaca data training hasil split
    data_train = pd.read_csv(preprocessed_path / "training.csv")

    # Persiapan label (y_train)
    y_train = prepare_train_model(model_path, data_train)

    # Membaca TF-IDF matrix hasil training
    tfidf_matrix = pd.read_csv(tfidf_path / "tfidf_vec.csv")

    # Training dan simpan model
    model_file = model_path / f"{kernel}.pkl"
    train_model(kernel, tfidf_matrix, y_train, model_file)

    logging.info(f"✅ Model {kernel} disimpan di {model_file}")
    print("✅ Training SVM selesai.")

# ----------------------------------------------------------------
# **Fungsi Prediksi (Dinamis)**
# ----------------------------------------------------------------
def run_prediction(test_text, model_name=None):
    """Menjalankan prediksi teks dengan model SVM"""
    logging.info("📌 Memulai prediksi...")

    # Load Model
    svm_model, model_filename = load_model(model_name)

    # Pastikan TF-IDF sudah tersedia
    X_trained_pkl = model_path / "X_trained.pkl"
    if not X_trained_pkl.exists():
        logging.error("🚨 File TF-IDF (X_trained.pkl) tidak ditemukan. Silakan lakukan training terlebih dahulu.")
        sys.exit(1)

    # File output hasil prediksi
    prediction_output = predict_path / f"predict_{model_filename}.csv"

    # Jalankan prediksi
    result = predict(test_text, X_trained_pkl, svm_model, prediction_output)

    logging.info(f"✅ Hasil prediksi ({model_filename}): {result}")
    return result

# ----------------------------------------------------------------
# **Fungsi Evaluasi Akurasi Model**
# ----------------------------------------------------------------
def evaluate_model():
    logging.info("📌 Memulai evaluasi model...")

    # Load data testing dari file CSV, bukan dari database
    testing_file = preprocessed_path / "testing.csv"
    if not testing_file.exists():
        logging.error("🚨 File testing.csv tidak ditemukan. Jalankan training_tfidf() terlebih dahulu.")
        sys.exit(1)

    data_testing = pd.read_csv(testing_file)

    if data_testing.empty:
        logging.error("🚨 Data testing.csv kosong. Silakan pastikan data tersedia.")
        sys.exit(1)

    X_test = data_testing["comment"]
    y_test = data_testing["label"]

    # Load LabelEncoder
    encoder_pkl = model_path / "encoder.pkl"
    if not encoder_pkl.exists():
        logging.error("🚨 File encoder.pkl tidak ditemukan. Pastikan telah melakukan training.")
        sys.exit(1)

    label_encoder = joblib.load(encoder_pkl)

    # Encode label testing sesuai dengan training
    y_test_encoded = label_encoder.transform(y_test)

    # Generate wordcloud dari data uji
    generate_wordcloud(X_test, y_test_encoded, label_encoder)

    # Load TF-IDF vectorizer hasil training
    X_train_pkl = model_path / "X_trained.pkl"
    if not X_train_pkl.exists():
        logging.error("🚨 File TF-IDF vectorizer tidak ditemukan. Pastikan proses training telah dilakukan.")
        sys.exit(1)

    tfidf_vectorizer = joblib.load(X_train_pkl)

    # Transformasi TF-IDF untuk data uji
    X_test_tfidf_array = tfidf_vectorizer.transform(X_test)
    X_test_tfidf = pd.DataFrame(
        X_test_tfidf_array.toarray(),
        columns=tfidf_vectorizer.get_feature_names_out()
    )


    # Cek model-model yang tersedia
    available_models = [
        m for m in model_path.glob("*.pkl")
        if "encoder" not in m.stem and "X_trained" not in m.stem
    ]

    if not available_models:
        logging.error("🚨 Tidak ada model SVM yang tersedia. Pastikan telah melakukan training.")
        sys.exit(1)

    results = []

    for model_file in available_models:
        model_name = model_file.stem
        logging.info(f"🔍 Evaluasi model: {model_name}")

        try:
            model = joblib.load(model_file)

            if not hasattr(model, "predict"):
                logging.warning(f"⚠️ {model_name} bukan model SVM yang valid. Melewati...")
                continue

            y_pred = model.predict(X_test_tfidf)

            accuracy = accuracy_score(y_test_encoded, y_pred)
            precision = precision_score(y_test_encoded, y_pred, average="weighted")
            recall = recall_score(y_test_encoded, y_pred, average="weighted")
            f1 = f1_score(y_test_encoded, y_pred, average="weighted")
            report = classification_report(y_test_encoded, y_pred, target_names=label_encoder.classes_)

            results.append({
                "Model": model_name,
                "Accuracy": accuracy,
                "Precision": precision,
                "Recall": recall,
                "F1-Score": f1
            })

            eval_file = predict_path / f"evaluation_{model_name}.txt"
            with open(eval_file, "w") as f:
                f.write(f"Model: {model_name}\n")
                f.write(f"Akurasi: {accuracy:.4f}\n")
                f.write(f"Precision: {precision:.4f}\n")
                f.write(f"Recall: {recall:.4f}\n")
                f.write(f"F1-Score: {f1:.4f}\n\n")
                f.write("Classification Report:\n")
                f.write(report)

            logging.info(f"✅ Evaluasi selesai untuk model {model_name}")
            logging.info(f"📄 Hasil disimpan di {eval_file}")

        except Exception as e:
            logging.error(f"🚨 Error saat evaluasi model {model_name}: {e}")
            continue

    if results:
        plot_evaluation_results(results)

    logging.info("✅ Evaluasi semua model selesai.")
    print("Evaluasi selesai")

# ----------------------------------------------------------------
# **Fungsi untuk Membuat Grafik Evaluasi**
# ----------------------------------------------------------------
def plot_evaluation_results(results):
    df_results = pd.DataFrame(results)

    # Set style seaborn untuk tampilan yang lebih rapi
    sns.set_theme(style="whitegrid")

    # Plot hasil evaluasi dalam bentuk bar chart
    fig, ax = plt.subplots(figsize=(10, 6))
    df_results.set_index("Model").plot(kind="bar", ax=ax, colormap="coolwarm")

    # Tambahkan angka di atas setiap bar
    for container in ax.containers:
        ax.bar_label(container, fmt="%.2f", label_type="edge", fontsize=10, padding=3)

    # Tambahkan judul dan label
    plt.title("Evaluasi Model: Akurasi, Precision, Recall, F1-Score", fontsize=14)
    plt.xlabel("Model", fontsize=12)
    plt.ylabel("Score", fontsize=12)
    plt.xticks(rotation=45)
    plt.ylim(0, 1)  # Karena semua metrik berada dalam range 0-1

    # Simpan gambar
    eval_img_path = predict_path / "model_evaluation.png"
    plt.savefig(eval_img_path, bbox_inches="tight")
    plt.close()

    logging.info(f"📊 Grafik evaluasi disimpan di {eval_img_path}")

# ----------------------------------------------------------------
# **Fungsi untuk Membuat WordCloud berdasarkan Kategori Sentimen**
# ----------------------------------------------------------------
def generate_wordcloud(comments, labels, label_encoder):
    logging.info("📌 Membuat WordCloud berdasarkan kategori sentimen...")

    # Pastikan labels dikonversi ke format string (jika belum)
    labels = label_encoder.inverse_transform(labels)

    # Kategorisasi komentar berdasarkan label
    categories = {
        "Positif": [],
        "Netral": [],
        "Negatif": []
    }

    for comment, label in zip(comments, labels):
        if label.lower() == "positif":
            categories["Positif"].append(comment)
        elif label.lower() == "netral":
            categories["Netral"].append(comment)
        elif label.lower() == "negatif":
            categories["Negatif"].append(comment)

    # Warna berbeda untuk tiap kategori
    colormaps = {
        "Positif": "Greens",
        "Netral": "Blues",
        "Negatif": "Reds"
    }

    # Buat dan simpan WordCloud untuk setiap kategori
    for category, texts in categories.items():
        if texts:  # Pastikan ada teks untuk diproses
            text_combined = " ".join(texts)
            wordcloud = WordCloud(width=800, height=400, background_color="white", colormap=colormaps[category]).generate(text_combined)

            # Plot dan simpan gambar
            plt.figure(figsize=(10, 5))
            plt.imshow(wordcloud, interpolation="bilinear")
            plt.axis("off")
            plt.title(f"WordCloud - {category}", fontsize=14)

            # Simpan gambar
            wordcloud_path = predict_path / f"wordcloud_{category.lower()}.png"
            plt.savefig(wordcloud_path, bbox_inches="tight")
            plt.close()

            logging.info(f"📊 WordCloud {category} disimpan di {wordcloud_path}")


##### Preprocessing data

In [None]:
# preprocessing()

##### Training TFIDF


In [None]:
training_tfidf(train_size=0.8)

##### Training SVM


In [None]:
training_svm('linear')
training_svm('rbf')
training_svm('poly')
training_svm('sigmoid')

##### Evaluasi model

In [None]:
evaluate_model()