In [None]:
import os
import re
import spacy
import numpy as np
import spacy
import networkx as nx

from bangla_stemmer.stemmer import stemmer

human_dir = "dataset/bn_human_vs_ai_corpus/human_written"
llm_dir = "dataset/bn_human_vs_ai_corpus/llm_generated"
nlp = spacy.blank("bn")

Data Preprocessing

In [None]:
# Bengali stop words
bengali_stopwords = set([
    "এবং", "তাহা", "উপর", "হয়", "করে", "যায়", "হতে", "এই", "তা", "তার", "হয়েছে", "ছিল",
    "কিন্তু", "তাকে", "আমরা", "আপনি", "তাদের", "সব", "অনেক", "কিছু", "কখনো", "একটি", "এটি",
])

# Preprocess text function
def preprocess_text(text):
    # Remove punctuation, digits, and non-Bengali characters
    text = re.sub(r"[^\u0980-\u09FF\s]", "", text)
    tokens = text.split()
    stmr = stemmer.BanglaStemmer()
    tokens = [stmr.stem(word) for word in tokens]
    tokens = [token for token in tokens if token not in bengali_stopwords]
    return tokens

# Load text data
def load_texts(folder):
    texts = []
    for filename in os.listdir(folder):
        if filename.endswith(".txt"):
            with open(os.path.join(folder, filename), "r", encoding="utf-8") as f:
                texts.append(f.read())
    return texts

# Load data
human_texts = load_texts(human_dir)
llm_texts = load_texts(llm_dir)

Train Word2Vec Model

In [None]:
from gensim.models import Word2Vec

# Train Word2Vec models
def train_word2vec(texts, vector_size=100, window=5, min_count=2):
    tokenized_texts = [preprocess_text(text) for text in texts]
    model = Word2Vec(sentences=tokenized_texts, vector_size=vector_size, window=window, min_count=min_count)
    return model

human_w2v_model = train_word2vec(human_texts)
llm_w2v_model = train_word2vec(llm_texts)

Extract Word2Vec Features

In [None]:
# Document embedding using Word2Vec
def get_document_embedding(tokens, model):
    word_vectors = [model.wv[word] for word in tokens if word in model.wv]
    if len(word_vectors) > 0:
        return np.mean(word_vectors, axis=0)  # Average word embeddings
    else:
        return np.zeros(model.vector_size)    # Default to zero vector

def extract_word2vec_features(text, human_model, llm_model):
    tokens = preprocess_text(text)
    human_emb = get_document_embedding(tokens, human_model)
    llm_emb = get_document_embedding(tokens, llm_model)
    return np.concatenate([human_emb, llm_emb])


Build and Extract Syntactic Graph Features

In [None]:
# Build ISG for a document
def build_isg(text):
    doc = nlp(text)
    G = nx.DiGraph()

    for token in doc:
        G.add_node(token.text, pos=token.pos_, dep=token.dep_)
        if token.head != token:
            G.add_edge(token.head.text, token.text, relation=token.dep_)

    return G

# # Extract graph features
# def extract_graph_features(G):
#     features = {}
#     degree_sequence = [G.degree(n) for n in G.nodes()]
#     features['avg_degree'] = np.mean(degree_sequence) if degree_sequence else 0
#     features['max_degree'] = np.max(degree_sequence) if degree_sequence else 0
#     features['min_degree'] = np.min(degree_sequence) if degree_sequence else 0
#     features['edge_density'] = nx.density(G) if len(G.nodes()) > 0 else 0
#     features['clustering_coefficient'] = nx.average_clustering(G) if len(G.nodes()) > 0 else 0
#     return list(features.values())


# Function to compute the H-index based on node degrees
def h_index(G):
    degrees = sorted([d for n, d in G.degree()], reverse=True)
    h = 0
    for i, degree in enumerate(degrees):
        if degree >= i + 1:
            h = i + 1
        else:
            break
    return h

# Extract graph features
def extract_graph_features(G):

    G = G.to_undirected()
    G.remove_edges_from(nx.selfloop_edges(G))
    features = {}
    degree_sequence = [G.degree(n) for n in G.nodes()]

    # Degree features
    features['avg_degree'] = np.mean(degree_sequence) if degree_sequence else 0
    features['max_degree'] = np.max(degree_sequence) if degree_sequence else 0
    features['min_degree'] = np.min(degree_sequence) if degree_sequence else 0

    # Graph properties
    features['num_edges'] = G.number_of_edges()
    features['density'] = nx.density(G) if len(G.nodes()) > 0 else 0
    features['radius'] = nx.radius(G) if nx.is_connected(G) else -1
    features['diameter'] = nx.diameter(G) if nx.is_connected(G) else -1
    features['circumference'] = nx.circumference(G) if nx.is_connected(G) else -1
    features['girth'] = nx.girth(G) if nx.is_connected(G) else -1

    # features['vertex_connectivity'] = nx.number_connected_components(G) if nx.is_connected(G) else -1
    # features['edge_connectivity'] = nx.edge_connectivity(G) if nx.is_connected(G) else -1

    # Chromatic features
    features['chromatic_number'] = len(set(nx.coloring.greedy_color(G).values())) if len(G.nodes()) > 0 else -1
    # # features['chromatic_index'] = nx.chromatic_index(G) if len(G.nodes()) > 0 else -1

    # # Clique number
    # features['clique_number'] = nx.graph_clique_number(G) if len(G.nodes()) > 0 else -1

    # Other graph properties
    features['degeneracy'] = max(nx.core_number(G).values()) if len(G.nodes()) > 0 else -1
    features['clustering_coeff'] = nx.average_clustering(G) if len(G.nodes()) > 0 else 0
    features['global_clustering_coeff'] = nx.transitivity(G) if len(G.nodes()) > 0 else 0
    features['h_index'] = h_index(G) if len(G.nodes()) > 0 else -1

    return list(features.values())

def extract_features_with_graph(text, human_model, llm_model):
    # Word2Vec features
    w2v_features = extract_word2vec_features(text, human_model, llm_model)
    # Graph features
    G = build_isg(text)
    graph_features = extract_graph_features(G)
    # Combine features
    return np.concatenate([w2v_features, graph_features])


Extract Features

In [None]:
# Extract features for all texts
human_features = [extract_features_with_graph(text, human_w2v_model, llm_w2v_model) for text in human_texts]
llm_features = [extract_features_with_graph(text, human_w2v_model, llm_w2v_model) for text in llm_texts]

# Labels: 0 = Human, 1 = LLM
human_labels = [0] * len(human_features)
llm_labels = [1] * len(llm_features)

# Combine features and labels
features = np.array(human_features + llm_features)
labels = np.array(human_labels + llm_labels)


Train-Test Split

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)

# Train classifier
clf = RandomForestClassifier(random_state=42)
clf.fit(X_train, y_train)

# Evaluate classifier
y_pred = clf.predict(X_test)
print(classification_report(y_test, y_pred))


In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

# Function to evaluate the model
def evaluate_model(y_true, y_pred):
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)

    print("Evaluation Metrics:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    return accuracy, precision, recall, f1

# Plot confusion matrix
def plot_confusion_matrix(y_true, y_pred, labels=["Human-written", "LLM-generated"]):
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(cmap=plt.cm.Blues, values_format='d')
    plt.title("Confusion Matrix")
    plt.show()

# Evaluate the model
accuracy, precision, recall, f1 = evaluate_model(y_test, y_pred)

# Plot the confusion matrix
plot_confusion_matrix(y_test, y_pred)


In [None]:
# Classify a new document
def classify_new_document(text, classifier, human_model, llm_model):
    features = extract_features_with_graph(text, human_model, llm_model)
    prediction = classifier.predict([features])
    return "Human-written" if prediction[0] == 0 else "LLM-generated"

# Function to classify a single txt file
def classify_single_document(file_path, classifier, human_model, llm_model):
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    result = classify_new_document(text, classifier, human_model, llm_model)
    print(f"File: {file_path} - Classification: {result}")

# Define the file path to the txt file
file_path = "dataset/bn_human_vs_ai_corpus/llm_generated/shapure_choto_golpo_ai.txt"  # Replace with the actual file path

# Call the function to classify the document
classify_single_document(file_path, clf, human_w2v_model, llm_w2v_model)

ISG

In [None]:
import os
import re
import spacy
import networkx as nx
import matplotlib.pyplot as plt
from bangla_stemmer.stemmer import stemmer

# Load spaCy Bengali NLP model
nlp = spacy.blank("bn")  # Ensure you have the Bengali spaCy model

# Bengali stop word list (extend or replace with a more comprehensive list if needed)
bengali_stopwords = set([
    "এবং", "তাহা", "উপর", "হয়", "করে", "যায়", "হতে", "এই", "তা", "তার", "হয়েছে", "ছিল",
    "কিন্তু", "তাকে", "আমরা", "আপনি", "তাদের", "সব", "অনেক", "কিছু", "কখনো", "একটি", "এটি",
])

# Preprocess text function
def preprocess_text(text):
    # Remove punctuation, digits, and non-Bengali characters
    text = re.sub(r"[^\u0980-\u09FF\s]", "", text)  # Keep only Bengali characters and whitespace

    tokens = text.split()
    print(tokens)
    stmr = stemmer.BanglaStemmer()
    tokens = [stmr.stem(word) for word in tokens]

    # Remove stopwords and return cleaned tokens
    tokens = [token for token in tokens if token not in bengali_stopwords]
    return tokens

# Build Integrated Syntactic Graph (ISG)
def build_isg(cleaned_tokens):
    # Initialize a directed graph for the ISG
    G = nx.DiGraph()

    # Add nodes and edges based on token position (basic syntactic structure example)
    for i, token in enumerate(cleaned_tokens):
        G.add_node(token, position=i)  # Add token as a node with position as an attribute
        if i > 0:
            G.add_edge(cleaned_tokens[i - 1], token)  # Add edges between consecutive tokens

    return G

# Plot the graph with cleaner visualization
def plot_graph(G, title="Graph Visualization", max_nodes=500, weight_threshold=None):
    plt.figure(figsize=(12, 8))

    # If the graph is too large, use a subgraph
    if len(G.nodes) > max_nodes:
        nodes_to_keep = list(G.nodes)[:max_nodes]
        G = G.subgraph(nodes_to_keep)

    # Filter edges by weight (if threshold is provided and graph is weighted)
    if weight_threshold is not None and nx.get_edge_attributes(G, 'weight'):
        edges_to_keep = [(u, v) for u, v, d in G.edges(data=True) if d.get('weight', 0) > weight_threshold]
        G = G.edge_subgraph(edges_to_keep)

    # Positioning algorithm
    pos = nx.spring_layout(G)

    # Draw the graph
    nx.draw(
        G, pos,
        with_labels=False,
        node_color="skyblue",
        node_size=50,  # Smaller nodes
        edge_color="gray",  # Light-colored edges
        width=0.5,  # Thinner edges
        alpha=0.7  # Add transparency
    )

    plt.title(title)
    plt.show()

# # Print graph properties
# def print_graph_properties(G):
#     print(f"Graph Properties:")
#     print(f"- Number of nodes: {G.number_of_nodes()}")
#     print(f"- Number of edges: {G.number_of_edges()}")
#     print(f"- Average degree: {sum(dict(G.degree()).values()) / G.number_of_nodes():.2f}")
#     print(f"- Is graph connected? {'Yes' if nx.is_connected(G.to_undirected()) else 'No'}")
#     print(f"- Diameter (if connected): {nx.diameter(G.to_undirected()) if nx.is_connected(G.to_undirected()) else 'N/A'}")
#     print()


# Function to compute the H-index based on node degrees
def h_index(G):
    degrees = sorted([d for n, d in G.degree()], reverse=True)
    h = 0
    for i, degree in enumerate(degrees):
        if degree >= i + 1:
            h = i + 1
        else:
            break
    return h

# Print graph properties
def print_graph_properties(G):
    print(f"Graph Properties:")
    print(f"- Number of nodes: {G.number_of_nodes()}")
    print(f"- Number of edges: {G.number_of_edges()}")
    print(f"- Average degree: {sum(dict(G.degree()).values()) / G.number_of_nodes():.2f}")

    # Convert to undirected graph for connectivity checks and related properties
    G_undirected = G.to_undirected()
    G_undirected.remove_edges_from(nx.selfloop_edges(G_undirected))

    print(f"- Is graph connected? {'Yes' if nx.is_connected(G_undirected) else 'No'}")
    print(f"- Diameter (if connected): {nx.diameter(G_undirected) if nx.is_connected(G_undirected) else 'N/A'}")
    print(f"- Radius (if connected): {nx.radius(G_undirected) if nx.is_connected(G_undirected) else 'N/A'}")
    print(f"- Density: {nx.density(G_undirected):.4f}")
    print(f"- Clustering coefficient: {nx.average_clustering(G_undirected):.4f}")
    print(f"- Girth (if connected): {nx.girth(G_undirected) if nx.is_connected(G_undirected) else 'N/A'}")
    print(f"- Vertex connectivity: {nx.number_connected_components(G_undirected) if nx.is_connected(G_undirected) else 'N/A'}")
    print(f"- Edge connectivity: {nx.edge_connectivity(G_undirected) if nx.is_connected(G_undirected) else 'N/A'}")
    print(f"- Degeneracy: {max(nx.core_number(G_undirected).values()) if len(G_undirected.nodes()) > 0 else 'N/A'}")
    print(f"- Global clustering coefficient: {nx.transitivity(G_undirected) if len(G_undirected.nodes()) > 0 else 'N/A'}")
    print(f"- H-index: {h_index(G_undirected) if len(G_undirected.nodes()) > 0 else 'N/A'}")
    print()

# Process all text files in a given folder
def process_folder(folder_path, label):
    graphs = []
    print(f"Processing {label} files in folder: {folder_path}\n")
    for filename in os.listdir(folder_path):
        if filename.endswith(".txt"):
            file_path = os.path.join(folder_path, filename)
            with open(file_path, "r", encoding="utf-8") as file:
                text = file.read()

                # Preprocess text
                cleaned_tokens = preprocess_text(text)
                print(f"File: {filename}")
                print(f"Cleaned tokens: {cleaned_tokens}\n")

                # Build graph
                G = build_isg(cleaned_tokens)
                graphs.append((filename, G))

                # Plot graph
                plot_graph(G, title=f"Graph for {label}: {filename}")

                # Print graph properties
                print(f"Graph properties for {filename}:")
                print_graph_properties(G)
    return graphs

# Main folder structure
base_folder = "dataset/bn_human_vs_ai_corpus"
human_folder = os.path.join(base_folder, "human_written")
llm_folder = os.path.join(base_folder, "llm_generated")

# Process human-written and LLM-generated text files
human_graphs = process_folder(human_folder, label="Human-written")
llm_graphs = process_folder(llm_folder, label="LLM-generated")