In [1]:
import os
import re
import math
import statistics
from collections import Counter
from typing import List, Union

import nltk
import numpy as np
import pandas as pd
import spacy
import textstat
from nltk.corpus import stopwords
from nltk.tokenize import sent_tokenize, word_tokenize
from scipy.stats import entropy
from tqdm import tqdm

# Download necessary NLTK data
nltk.download("punkt")
nltk.download("averaged_perceptron_tagger")
nltk.download("stopwords")

# Load SpaCy model for syntactic features
nlp = spacy.load("en_core_web_sm")

[nltk_data] Downloading package punkt to /home/majkel/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/majkel/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package stopwords to /home/majkel/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [2]:
def d_metric(string):
    string_list = string.split()
    counts = np.unique(string_list, return_counts=True)[1]
    numerator = np.sum(counts*(counts-1))
    n = len(string_list)
    if n < 2:
        return 0.0
    denominator = n*(n-1)
    return numerator/denominator

def lexical_features(text: str) -> dict:
    words = word_tokenize(text)
    sentences = sent_tokenize(text)
    stop_words = set(stopwords.words("english"))
    unique_words = set(words)
    return {
        "word_count": len(words),
        "character_count": sum(len(w) for w in words),
        "average_word_length": sum(len(w) for w in words) / len(words) if words else 0,
        "sentence_count": len(sentences),
        "TTR": len(unique_words) / len(words) if words else 0,
        "RTTR": np.sqrt(len(unique_words)) / len(words) if words else 0,
        "CTTR": len(unique_words) / ((len(words)*2) ** 0.5) if words else 0,
        "DMetric": d_metric(text),
        "Mass": (np.log10(len(words)) - np.log10(len(unique_words))) / (np.log10(len(words))**2) if len(words) > 1 else 0,
        "stopword_ratio": len([w for w in words if w.lower() in stop_words]) / len(words) if words else 0,
    }


def nlp_features(text: str) -> dict:
    doc = nlp(text)
    pos_counts = Counter(token.pos_ for token in doc)
    entities = list(doc.ents)
    sentiment_scores = [token.sentiment for token in doc if token.sentiment != 0]
    distances = [abs(token.head.i - token.i) for token in doc if token.head != token]
    return {
        "noun_ratio": pos_counts.get("NOUN", 0) / len(doc) if doc else 0,
        "verb_ratio": pos_counts.get("VERB", 0) / len(doc) if doc else 0,
        "adjective_ratio": pos_counts.get("ADJ", 0) / len(doc) if doc else 0,
        "average_sentence_length": sum(len(sent.text.split()) for sent in doc.sents) / len(list(doc.sents)) if list(doc.sents) else 0,
        "std_sentence_length": statistics.pstdev([len(sent.text.split()) for sent in doc.sents]) if list(doc.sents) else 0,
        "entity_count": len(entities),
        "syntactic_depth": max((len(list(token.ancestors)) for token in doc), default=0),
        "dependency_distance": np.mean(distances) if distances else 0,
        "average_sentiment_score": np.mean(sentiment_scores) if sentiment_scores else 0,
        "sentiment_variability": np.std(sentiment_scores) if len(sentiment_scores) > 1 else 0,
    }


def readability_features(text: str) -> dict:
    return {
        "flesch_reading_ease": textstat.flesch_reading_ease(text),
        "gunning_fog_index": textstat.gunning_fog(text),
        "smog_index": textstat.smog_index(text),
        "automated_readability_index": textstat.automated_readability_index(text),
        "dale_chall_readability": textstat.dale_chall_readability_score(text),
    }


def stylometric_features(text: str) -> dict:
    words = word_tokenize(text.lower())
    return {
        "punctuation_count": sum(1 for char in text if char in ".,;!?"),
        "entropy_score": entropy(list(Counter(words).values())),
    }


def discourse_features(text: str) -> dict:
    markers = {"however", "therefore", "moreover", "nevertheless", "thus", "on the other hand"}
    words = word_tokenize(text)
    count = sum(1 for w in words if w.lower() in markers)
    return {
        "discourse_marker_count": count,
        "discourse_marker_ratio": count / len(words) if words else 0,
    }



def repetition_features(text: str) -> dict:
    tokens = [w.lower() for w in word_tokenize(text)]
    def _ngram(n):
        ngrams = list(zip(*(tokens[i:] for i in range(n))))
        return 1 - len(set(ngrams)) / len(ngrams) if ngrams else 0
    freq = Counter(tokens)
    hapax = sum(1 for w,c in freq.items() if c == 1)
    return {
        "bigram_repetition_ratio": _ngram(2),
        "trigram_repetition_ratio": _ngram(3),
        "hapax_legomena_ratio": hapax / len(tokens) if tokens else 0,
    }


def syntactic_features(text: str) -> dict:
    return {
        "present_participle_count": sum(1 for w in word_tokenize(text) if w.lower().endswith('ing')),
        "passive_voice_count": len(re.findall(r"\b(was|were|is|are|been|being)\s+\w+ed\b", text, flags=re.IGNORECASE)),
    }


def cohesion_features(text: str) -> dict:
    text_l = text.lower()
    return {
        "conjunction_count": sum(text_l.count(w) for w in [" and ", " or ", " but ", " however ", " because ", " therefore "]),
        "pronoun_count": sum(text_l.count(p) for p in [" i ", " you ", " he ", " she ", " they ", " we ", " it "]),
        "contraction_count": sum(len(re.findall(p, text)) for p in [r"\b\w+n't\b", r"\b\w+'re\b", r"\b\w+'ve\b", r"\b\w+'ll\b", r"\b\w+'d\b"]),
    }



def extract_features_single_text(text: str) -> dict:
    features = {}
    features.update(lexical_features(text))
    features.update(nlp_features(text))
    features.update(readability_features(text))
    features.update(stylometric_features(text))
    features.update(discourse_features(text))
    features.update(repetition_features(text))
    features.update(syntactic_features(text))
    features.update(cohesion_features(text))
    return features

In [3]:
def calc_features(texts):
    results = []
    for text in tqdm(texts):
        features = extract_features_single_text(text)
        results.append(features)
    
    df = pd.DataFrame(results)
    return df

In [4]:
def get_csv_paths(folder_path, recursive=False):
    if recursive:
        # Walk through all subdirectories
        file_paths = [os.path.join(root, file) 
                      for root, _, files in os.walk(folder_path) 
                      for file in files if file.endswith('.csv')]
    else:
        # Get files in the root folder only
        file_paths = [os.path.join(folder_path, file) 
                      for file in os.listdir(folder_path) 
                      if file.endswith('.csv')]
    
    return file_paths

In [5]:
def save_feature_stats(df, stats, data_path, save_path):
    df_stat = df.agg(stats).reset_index()

    data_name, model = data_path.split("/")[-1].split("_")
    model = model.removesuffix(".csv")

    df_stat["model"] = model
    df_stat["data"] = data_name
    df_stat.rename(columns={"index": "stat"}, inplace=True)
    df_stat.to_csv(save_path, mode="a", index=False, header=not pd.io.common.file_exists(save_path))

In [6]:
def percentile(n):
    def percentile_(x):
        return x.quantile(n)
    percentile_.__name__ = 'percentile_{:02.0f}'.format(n*100)
    return percentile_

In [7]:
DATA_HUMAN_PATH = "../data/data_human"
DATA_AI_PATH = "../data/data_ai"
FEATURES_PATH = "../data/features/"
FEATURES_STATS_PATH = "../data/features/features_stats_master.csv"
STATS = ['mean', 'std', 'min', 'max', 'median', 'skew', 'kurtosis', 'var', percentile(0.1), percentile(0.2), percentile(0.3), percentile(0.4), percentile(0.5), percentile(0.6), percentile(0.7), percentile(0.8), percentile(0.9)]

In [8]:
paths = get_csv_paths(DATA_HUMAN_PATH) + get_csv_paths(DATA_AI_PATH, recursive=True)

In [None]:
for path in paths:
    if path.split("_")[-1] == "human.csv":
        features_path = os.path.join(FEATURES_PATH, path.split("/")[-2], path.split("/")[-1].replace(".csv", "_features.csv"))
    else:
        features_path = os.path.join(FEATURES_PATH, path.split("/")[-3], path.split("/")[-2], path.split("/")[-1].replace(".csv", "_features.csv"))

    df = pd.read_csv(path)
    texts = df["text"].values
    df_features = calc_features(texts)
    # df_features.to_csv(features_path, index=False)

    # save_feature_stats(df_features, STATS, path, FEATURES_STATS_PATH)

100%|██████████| 11580/11580 [03:40<00:00, 52.54it/s]
100%|██████████| 11581/11581 [11:12<00:00, 17.21it/s] 
  5%|▌         | 592/11581 [00:10<04:18, 42.57it/s]