Initial setup: downloads and dataset loading

In [None]:
import os
# --- CONFIGURAZIONE PERCORSI GLOBALI ---
DRIVE_FOLDER = "/content/drive/MyDrive/Esame_Complexity"

# Creazione cartella se non esiste
if not os.path.exists(DRIVE_FOLDER):
    try:
        os.makedirs(DRIVE_FOLDER)
    except:
        # Questo serve nel caso in cui il Drive non sia ancora montato
        pass

datasets = {
    'ose_adv_ele': 'OSE_adv_ele.csv',
    'ose_adv_int': 'OSE_adv_int.csv',
    'swipe': 'swipe.csv',
    'vikidia': 'vikidia.csv'
}

In [None]:
import sys
!{sys.executable} -m pip install nltk
!{sys.executable} -m pip install textcomplexity
!{sys.executable} -m pip install stanza
!{sys.executable} -m pip install wordfreq
!{sys.executable} -m spacy download en_core_web_md
!{sys.executable} -m pip install tqdm spacy numpy

In [None]:
# Standard library imports
import json
from collections import Counter
from functools import lru_cache
from pprint import pprint
from typing import Dict, Set, Iterable, Optional, Any, Tuple
import importlib.resources as pkg_resources

# Third-party imports
import numpy as np
import pandas as pd
import nltk
from nltk.corpus import wordnet as wn
import spacy
import stanza
import textcomplexity  # only used to access en.json
from tqdm.auto import tqdm

# Download required resources
stanza.download('en')
nltk.download('wordnet')
nltk.download('omw-1.4')

# Make sure WordNet is available; if not, download it.
try:
    _ = wn.synsets("dog")
except LookupError:
    nltk.download("wordnet")
    nltk.download("omw-1.4")

# Load spaCy model
nlp = spacy.load("en_core_web_md", disable=["ner", "textcat"])
spacy_nlp = nlp
spacy_nlp.add_pipe("sentencizer")


# Stanza pipeline cache
@lru_cache(maxsize=None)  # Cache pipelines for different languages
def get_stanza_pipeline(lang: str):
    if lang == 'en':
        # Use pre-trained models for English, including constituency parser
        # for CS metric and dependency parser for MDD metric
        return stanza.Pipeline(lang='en', processors='tokenize,pos,lemma,depparse,constituency')
    else:
        raise ValueError(f"Unsupported language: {lang}")

# Define CONTENT_UPOS - this was missing previously, causing an error if not defined globally or locally where used
# Based on the usage in _compute_lexical_density, these are Universal POS tags for content words
CONTENT_UPOS = {"NOUN", "PROPN", "ADJ", "VERB", "ADV"}

# Define CONTENT_POS - this was also missing previously and is used in the discourse complexity functions
CONTENT_POS =  {"NOUN", "VERB", "ADJ", "ADV"}

In [None]:
datasets ={'ose_adv_ele':'OSE_adv_ele.csv',
           'ose_adv_int':'OSE_adv_int.csv',
           'swipe': 'swipe.csv',
           'vikidia':'vikidia.csv'}

def load_data(path):
    return pd.read_csv(path, sep='\t')


def load_dataset(name):
    if name not in datasets:
        raise ValueError(f"Dataset {name} not found")
    return load_data(datasets[name])

In [None]:
df = load_dataset('ose_adv_ele')
df.head(3)

In [None]:
row = df.sample(1)

print('SIMPLE TEXT')
print(row['Simple'].iloc[0])
print('-'*100)
print('COMPLEX TEXT')
print(row['Complex'].iloc[0])

In [None]:
cnt = 0
for name, path in datasets.items():
    df = load_dataset(name)
    print(f"{name}: {df.shape[0]} rows")
    cnt += df.shape[0]
print(f"Total: {cnt} rows")

In [None]:
df = load_dataset('ose_adv_ele')

In [None]:
# Cache stanza pipelines to avoid re-loading models
_STANZA_PIPELINES: Dict[str, stanza.Pipeline] = {}

# UPOS tags considered content words (C)
CONTENT_UPOS = {"NOUN", "PROPN", "VERB", "ADJ", "ADV"}


@lru_cache()
def load_cow_top5000_en() -> Set[str]:
    """
    Load the COW-based list of the 5,000 most frequent English content words
    from textcomplexity's English language definition file (en.json).

    We ignore POS tags and keep only lowercased word forms.
    """
    with pkg_resources.files(textcomplexity).joinpath("en.json").open(
        "r", encoding="utf-8"
    ) as f:
        lang_def = json.load(f)

    most_common = lang_def["most_common"]  # list of [word, xpos]
    cow_top5000 = {w.lower() for w, xpos in most_common}
    return cow_top5000


def get_stanza_pipeline(lang: str = "en", use_gpu: bool = True) -> stanza.Pipeline:
    """
    Get (or create) a cached stanza Pipeline for a given language.

    NOTE: You must have downloaded the models beforehand, e.g.:
        import stanza
        stanza.download('en')
    """
    if lang not in _STANZA_PIPELINES:
        _STANZA_PIPELINES[lang] = stanza.Pipeline(
            lang=lang,
            processors="tokenize,pos,lemma,depparse,constituency",
            use_gpu=use_gpu,
            tokenize_no_ssplit=False,
        )
    return _STANZA_PIPELINES[lang]

Lex. Complexity

In [None]:
def _compute_mtld(tokens: Iterable[str], ttr_threshold: float = 0.72) -> Optional[float]:
    """
    Compute MTLD (Measure of Textual Lexical Diversity) for a list of tokens.

    MTLD = total_number_of_tokens / number_of_factors

    A factor is a contiguous segment where the running TTR stays >= threshold.
    When the TTR drops below the threshold, we close a factor (at the previous
    token) and start a new one. At the end, the remaining partial segment is
    counted as a fractional factor, with weight proportional to how close the
    final TTR is to the threshold.
    """
    tokens = [tok for tok in tokens if tok]
    if not tokens:
        return None

    types = set()
    factor_count = 0.0
    token_count_in_factor = 0

    for tok in tokens:
        token_count_in_factor += 1
        types.add(tok)
        ttr = len(types) / token_count_in_factor

        if ttr < ttr_threshold:
            factor_count += 1.0
            types = set()
            token_count_in_factor = 0

    # final partial factor
    if token_count_in_factor > 0:
        final_ttr = len(types) / token_count_in_factor
        if final_ttr < 1.0:
            fractional = (1.0 - final_ttr) / (1.0 - ttr_threshold)
            fractional = max(0.0, min(1.0, fractional))
            factor_count += fractional

    if factor_count == 0:
        return None

    return len(tokens) / factor_count



def _compute_lexical_density(total_tokens: int, content_tokens: int) -> Optional[float]:
    """
    LD = |C| / |T|
    where:
        |C| = number of content-word tokens
        |T| = total number of non-punctuation tokens
    """
    if total_tokens == 0:
        return None
    return content_tokens / total_tokens


def _compute_lexical_sophistication_cow(
    content_forms: Iterable[str],
    cow_top5000: set,
) -> Optional[float]:
    """
    LS = |{ w in C : w not in R }| / |C|
    where:
        C = content-word tokens (surface forms, lowercased)
        R = COW top-5000 content word forms (lowercased)
    """
    forms = [f for f in content_forms if f]
    if not forms:
        return None

    off_list = sum(1 for f in forms if f not in cow_top5000)
    return off_list / len(forms)


@lru_cache(maxsize=1)
def load_cow_top5000_en() -> Set[str]:
    """
    Load the COW top-5000 English content word forms from textcomplexity package data.
    The list is expected to be in 'textcomplexity/data/en.json' under the key 'cow_top5000'.
    """
    # Use importlib.resources.files for modern package data access
    json_path = pkg_resources.files('textcomplexity').joinpath('en.json')
    with json_path.open('r') as f:
        data = json.load(f)
        return set(data.get("cow_top5000", []))


def lexical_measures_from_doc(doc) -> Dict[str, Optional[float]]:
    """
    Compute MTLD, LD, LS from a stanza Document.
    """
    cow_top5000 = load_cow_top5000_en()

    mtld_tokens = []
    total_tokens = 0
    content_tokens = 0
    content_forms = []

    for sent in doc.sentences:
        for word in sent.words:
            if word.upos == "PUNCT":
                continue

            lemma = (word.lemma or word.text or "").lower()
            if not lemma:
                continue

            mtld_tokens.append(lemma)
            total_tokens += 1

            if word.upos in CONTENT_UPOS:
                content_tokens += 1
                form = (word.text or "").lower()
                content_forms.append(form)

    mtld = _compute_mtld(mtld_tokens) if mtld_tokens else None
    ld = _compute_lexical_density(total_tokens, content_tokens)
    ls = _compute_lexical_sophistication_cow(content_forms, cow_top5000)

    return {"MTLD": mtld, "LD": ld, "LS": ls}


def lexical_measures_from_text(text: str, lang: str = "en") -> Dict[str, Optional[float]]:
    """
    Convenience wrapper: parse a single text and compute lexical measures.
    """
    if text is None:
        text = ""
    text = str(text)

    if not text.strip():
        return {"MTLD": None, "LD": None, "LS": None}

    nlp = get_stanza_pipeline(lang)
    doc = nlp(text)
    return lexical_measures_from_doc(doc)



def compute_lexical_measures_df(
    df: pd.DataFrame,
    column: str = "text",
    lang: str = "en",
) -> Dict[str, Dict[Any, Optional[float]]]:
    """
    Compute lexical measures for each row in df[column].

    Returns:
        {
            "MTLD": {index: value},
            "LD":   {index: value},
            "LS":   {index: value},
        }
    """
    mtld_res: Dict[Any, Optional[float]] = {}
    ld_res: Dict[Any, Optional[float]] = {}
    ls_res: Dict[Any, Optional[float]] = {}

    for idx, text in df[column].items():
        metrics = lexical_measures_from_text(text, lang=lang)
        mtld_res[idx] = metrics["MTLD"]
        ld_res[idx] = metrics["LD"]
        ls_res[idx] = metrics["LS"]

    return {"MTLD": mtld_res, "LD": ld_res, "LS": ls_res}

Synt. Complexity

In [None]:

def mdd_from_doc(doc) -> Optional[float]:
    """
    Compute Mean Dependency Distance (MDD) from a stanza Document.

    For each sentence s_i with dependency set D_i:
        MDD_i = (1 / |D_i|) * sum_{(h,d) in D_i} |h - d|
    Then:
        MDD = (1 / k) * sum_i MDD_i, over all sentences with at least one dependency.
    """
    sentence_mdds = []

    for sent in doc.sentences:
        distances = []
        for w in sent.words:
            if w.head is None or w.head == 0:
                continue
            distances.append(abs(w.id - w.head))

        if distances:
            sentence_mdds.append(sum(distances) / len(distances))

    if not sentence_mdds:
        return None
    return sum(sentence_mdds) / len(sentence_mdds)



def _count_clauses_in_tree(tree) -> int:
    """
    Count clause nodes in a constituency tree.

    A simple and standard heuristic (PTB-style) is:
        count all nodes whose label starts with 'S'
        (S, SBAR, SBARQ, SINV, SQ, etc.).

    This aligns with the idea of counting finite and subordinate clauses
    as in Hunt (1965) and later complexity work.
    """
    if tree is None:
        return 0

    # Stanza's constituency tree: tree.label, tree.children
    count = 1 if getattr(tree, "label", "").startswith("S") else 0

    for child in getattr(tree, "children", []):
        # leaves can be strings or terminals without 'label'
        if hasattr(child, "label"):
            count += _count_clauses_in_tree(child)

    return count


def cs_from_doc(doc) -> Optional[float]:
    """
    Compute CS (clauses per sentence) from a stanza Document.

        CS = (1 / k) * sum_i L_i

    where L_i is the number of clauses in sentence s_i, estimated by counting
    all constituents whose label starts with 'S' in the constituency tree of s_i.
    """
    clause_counts = []
    for sent in doc.sentences:
        tree = getattr(sent, "constituency", None)
        if tree is None:
            # No constituency tree available for this sentence
            continue
        num_clauses = _count_clauses_in_tree(tree)
        clause_counts.append(num_clauses)

    if not clause_counts:
        return None

    return sum(clause_counts) / len(clause_counts)



def syntactic_measures_from_doc(doc) -> Dict[str, Optional[float]]:
    """
    Compute MDD and CS from a stanza Document.
    """
    mdd = mdd_from_doc(doc)
    cs = cs_from_doc(doc)
    return {"MDD": mdd, "CS": cs}


def syntactic_measures_from_text(text: str, lang: str = "en") -> Dict[str, Optional[float]]:
    """
    Convenience wrapper: parse a single text and compute syntactic measures.
    """
    if text is None:
        text = ""
    text = str(text)

    if not text.strip():
        return {"MDD": None, "CS": None}

    nlp = get_stanza_pipeline(lang)
    doc = nlp(text)
    return syntactic_measures_from_doc(doc)


def compute_syntactic_measures_df(
    df: pd.DataFrame,
    column: str = "text",
    lang: str = "en",
) -> Dict[str, Dict[Any, Optional[float]]]:
    """
    Compute syntactic measures for each row in df[column].

    Returns:
        {
            "MDD": {index: value},
            "CS":  {index: value},
        }
    """
    mdd_res: Dict[Any, Optional[float]] = {}
    cs_res: Dict[Any, Optional[float]] = {}

    for idx, text in df[column].items():
        metrics = syntactic_measures_from_text(text, lang=lang)
        mdd_res[idx] = metrics["MDD"]
        cs_res[idx] = metrics["CS"]

    return {"MDD": mdd_res, "CS": cs_res}

Discourse complexity

In [None]:

# Approximate set of content POS tags (spaCy universal POS)
CONTENT_POS =  {"NOUN", "VERB", "ADJ", "ADV"}


def is_content_token(tok):
    """
    Return True if token is considered a content word.
    We ignore stopwords, punctuation, and non-alphabetic tokens.
    """
    return (
        tok.is_alpha
        and not tok.is_stop
        and tok.pos_ in CONTENT_POS
    )


@lru_cache(maxsize=100000)
def get_related_lemmas(lemma):
    """
    Return a set of semantically related lemmas for the given lemma
    using WordNet, including:
      - synonyms
      - antonyms
      - hypernyms / hyponyms
      - meronyms (part/member/substance)
      - coordinate terms (siblings under the same hypernym)

    NOTE: Some older examples mention 'troponyms', but in NLTK's
    WordNet interface there is no 'troponyms()' method on Synset,
    so we do NOT use it here.
    """
    lemma = lemma.lower()
    related = set()
    synsets = wn.synsets(lemma)

    for syn in synsets:
        # Synonyms and antonyms
        for l in syn.lemmas():
            related.add(l.name().lower().replace("_", " "))
            for ant in l.antonyms():
                related.add(ant.name().lower().replace("_", " "))

        # Hypernyms (more general) and hyponyms (more specific)
        for hyper in syn.hypernyms():
            for l in hyper.lemmas():
                related.add(l.name().lower().replace("_", " "))
        for hypo in syn.hyponyms():
            for l in hypo.lemmas():
                related.add(l.name().lower().replace("_", " "))

        # Meronyms: part/member/substance
        for mer in syn.part_meronyms() + syn.member_meronyms() + syn.substance_meronyms():
            for l in mer.lemmas():
                related.add(l.name().lower().replace("_", " "))

        # Coordinate terms (siblings under same hypernym)
        for hyper in syn.hypernyms():
            for sibling in hyper.hyponyms():
                if sibling == syn:
                    continue
                for l in sibling.lemmas():
                    related.add(l.name().lower().replace("_", " "))

    # Remove the lemma itself if present
    related.discard(lemma)
    return related


def lexical_cohesion_single(text, nlp):
    """
    Compute Lexical Cohesion (LC) for a single document:

        LC = |C| / m

    where:
      - |C| is the number of cohesive devices between sentences
        (lexical repetition + semantic relations),
      - m  is the total number of word tokens (alphabetic) in the document.

    If the document has fewer than 2 sentences or no valid words,
    LC is returned as 0.0.
    """
    if not isinstance(text, str) or not text.strip():
        return 0.0

    doc = nlp(text)

    # Total number of alphabetic tokens (denominator m)
    m = sum(1 for tok in doc if tok.is_alpha)
    if m == 0:
        return 0.0

    sentences = list(doc.sents)
    if len(sentences) < 2:
        # With only one sentence, cross-sentence cohesion is not defined
        return 0.0

    # Collect sets of content lemmas per sentence
    sent_lemmas = []
    for sent in sentences:
        lemmas = set(
            tok.lemma_.lower()
            for tok in sent
            if is_content_token(tok)
        )
        if lemmas:
            sent_lemmas.append(lemmas)

    if len(sent_lemmas) < 2:
        return 0.0

    cohesive_count = 0

    for i in range(len(sent_lemmas) - 1):
        for j in range(i + 1, len(sent_lemmas)):
            li = sent_lemmas[i]
            lj = sent_lemmas[j]

            # 1) Lexical repetition: shared lemmas
            shared = li & lj
            cohesive_count += len(shared)

            # 2) Semantic relations via WordNet
            for lemma in li:
                related = get_related_lemmas(lemma)
                cohesive_count += len(related & lj)

    return float(cohesive_count) / float(m)


def sentence_vector(sent, vector_size):
    """
    Represent a sentence as the average of token vectors.
    If no token has a vector, return a zero vector.
    """
    vecs = [
        tok.vector
        for tok in sent
        if tok.has_vector and not tok.is_punct and not tok.is_space
    ]
    if not vecs:
        return np.zeros(vector_size, dtype="float32")
    return np.mean(vecs, axis=0)


def coherence_single(text, nlp):
    """
    Compute Coherence (CoH) for a single document as the average
    cosine similarity between adjacent sentence vectors:

        CoH = (1 / (k-1)) * sum_{i=1}^{k-1} cos(h_i, h_{i+1})

    where h_i is the sentence/topic vector for sentence i.

    If the document has fewer than 2 sentences, CoH = 0.0.
    """
    if not isinstance(text, str) or not text.strip():
        return 0.0

    if nlp.vocab.vectors_length == 0:
        raise ValueError(
            "The loaded spaCy model does not contain word vectors "
            "(nlp.vocab.vectors_length == 0). "
            "Use a model like 'en_core_web_md' or similar."
        )

    doc = nlp(text)
    sentences = list(doc.sents)
    k = len(sentences)

    if k < 2:
        # Only one sentence: no adjacent pair, coherence = 0.0
        return 0.0

    vector_size = nlp.vocab.vectors_length
    sent_vectors = [
        sentence_vector(sent, vector_size)
        for sent in sentences
    ]

    sims = []
    for i in range(k - 1):
        v1 = sent_vectors[i]
        v2 = sent_vectors[i + 1]
        norm1 = np.linalg.norm(v1)
        norm2 = np.linalg.norm(v2)
        denom = norm1 * norm2
        if denom == 0.0:
            # Skip pairs where at least one sentence vector is zero
            continue
        cos_sim = float(np.dot(v1, v2) / denom)
        sims.append(cos_sim)

    if not sims:
        return 0.0

    return float(np.mean(sims))



def compute_lexical_cohesion_vector(df, nlp, column="text"):
    """
    Compute LC for each row of a DataFrame.

    Parameters
    ----------
    df : pandas.DataFrame
        DataFrame containing the texts.
    nlp : spaCy Language object
        Pre-loaded spaCy pipeline with lemmatizer, POS tagger, etc.
    column : str, default "text"
        Name of the column that contains the text.

    Returns
    -------
    np.ndarray
        1D array of LC scores, length == len(df).
    """
    texts = df[column].fillna("").astype(str)
    scores = [lexical_cohesion_single(t, nlp) for t in texts]
    return np.array(scores, dtype="float32")


def compute_coherence_vector(df, nlp, column="text"):
    """
    Compute CoH for each row of a DataFrame.

    Parameters
    ----------
    df : pandas.DataFrame
        DataFrame containing the texts.
    nlp : spaCy Language object
        Pre-loaded spaCy pipeline with word vectors.
    column : str, default "text"
        Name of the column that contains the text.

    Returns
    -------
    np.ndarray
        1D array of CoH scores, length == len(df).
    """
    texts = df[column].fillna("").astype(str)
    scores = [coherence_single(t, nlp) for t in texts]
    return np.array(scores, dtype="float32")


def compute_discourse_measures(df, nlp, column="text"):
    """
    Compute both LC and CoH for each row of a DataFrame and return
    them in a dictionary.

    Returns
    -------
    dict
        {
            "LC":  np.ndarray of lexical cohesion scores,
            "CoH": np.ndarray of coherence scores
        }
    """
    lc_vec = compute_lexical_cohesion_vector(df, nlp, column=column)
    coh_vec = compute_coherence_vector(df, nlp, column=column)
    return {"LC": lc_vec, "CoH": coh_vec}

Calculate Complexity Values

In [None]:

def _analyze_text_all(text: str, lang: str = "en") -> Dict[str, Optional[float]]:
    """
    Parse a text with stanza and compute all measures (lexical + syntactic)
    in a single pass.

    Returns a dict with keys:
        "MTLD", "LD", "LS", "MDD", "CS"
    (Discourse measures LC/CoH are added later at DataFrame level, via spaCy.)
    """
    if text is None:
        text = ""
    text = str(text)

    if not text.strip():
        return {"MTLD": None, "LD": None, "LS": None, "MDD": None, "CS": None}

    nlp = get_stanza_pipeline(lang)
    doc = nlp(text)

    lex = lexical_measures_from_doc(doc)
    syn = syntactic_measures_from_doc(doc)

    out: Dict[str, Optional[float]] = {}
    out.update(lex)
    out.update(syn)
    return out


def compute_all_complexity_measures_df(
    df: pd.DataFrame,
    column: str = "text",
    lang: str = "en",
    spacy_nlp=None,
) -> Dict[str, Dict[Any, Optional[float]]]:
    """
    Compute all complexity measures for each row in df[column].

    Args
    ----
    df : pandas.DataFrame
        DataFrame with a text column.
    column : str, default "text"
        Name of the text column.
    lang : str, default "en"
        Language code for stanza.
    n_jobs : int, default 1
        Number of worker processes to use.
            - 1  : sequential execution (no multiprocessing).
            - >1 : multiprocessing with that many workers.
            - 0 or None : use cpu_count() workers.
    spacy_nlp : spaCy Language, required for LC / CoH
        Pre-loaded spaCy pipeline with:
            - POS / lemmatizer for LC
            - word vectors for CoH (e.g. 'en_core_web_md').

    Returns
    -------
    dict
        {
            "MTLD": {index: value},
            "LD":   {index: value},
            "LS":   {index: value},
            "MDD":  {index: value},
            "CS":   {index: value},
            "LC":   {index: value},
            "CoH":  {index: value},
        }
    """
    mtld_res: Dict[Any, Optional[float]] = {}
    ld_res: Dict[Any, Optional[float]] = {}
    ls_res: Dict[Any, Optional[float]] = {}
    mdd_res: Dict[Any, Optional[float]] = {}
    cs_res: Dict[Any, Optional[float]] = {}

    items = list(df[column].items())  # list[(index, text)]
    total_items = len(items)

    # ---- Lexical + syntactic (stanza) ----
    for idx, text in tqdm(
        items,
        total=total_items,
        desc="Computing lexical & syntactic complexity (sequential)",
    ):
        metrics = _analyze_text_all(text, lang=lang)
        mtld_res[idx] = metrics["MTLD"]
        ld_res[idx] = metrics["LD"]
        ls_res[idx] = metrics["LS"]
        mdd_res[idx] = metrics["MDD"]
        cs_res[idx] = metrics["CS"]


    # ---- Discourse measures (spaCy: LC & CoH) ----
    if spacy_nlp is None:
        raise ValueError(
            "spacy_nlp must be provided to compute LC and CoH. "
            "Load a spaCy model with vectors, e.g. 'en_core_web_md', and "
            "pass it as spacy_nlp=..."
        )

    discourse = compute_discourse_measures(df, spacy_nlp, column=column)
    lc_vec = discourse["LC"]
    coh_vec = discourse["CoH"]

    lc_res: Dict[Any, float] = {}
    coh_res: Dict[Any, float] = {}

    # Map arrays back to DataFrame indices
    for i, idx in enumerate(df.index):
        lc_res[idx] = float(lc_vec[i])
        coh_res[idx] = float(coh_vec[i])

    return {
        "MTLD": mtld_res,
        "LD": ld_res,
        "LS": ls_res,
        "MDD": mdd_res,
        "CS": cs_res,
        "LC": lc_res,
        "CoH": coh_res,
    }

In [None]:
"""
Example script: load a DataFrame and compute all complexity measures.
"""

if __name__ == "__main__":

    df_example = df.sample(n=5, random_state=12) # We sample 5 random rows
    # Compute all measures for Simple texts
    metrics = compute_all_complexity_measures_df(
        df_example,
        column="Simple", # Note that we use the column "Simple" for the Simple text. Use 'Complex' for the Complex text.
        lang="en",

        spacy_nlp=spacy_nlp
    )

    print("All complexity measures (per row):")
    pprint(metrics)

Drive mounting

In [None]:
from google.colab import drive
import os

# Check if Google Drive is already mounted by checking for the presence of a common directory within the mount point
if not os.path.exists('/content/drive/MyDrive'):
    print("Mounting Google Drive...")
    drive.mount('/content/drive')
else:
    print("Google Drive is already mounted.")

# Crea una cartella dedicata per non fare confusione nel Drive
save_path = "/content/drive/MyDrive/Esame_Complexity"
if not os.path.exists(save_path):
    try:
        os.makedirs(save_path)
        print(f"Created directory: {save_path}")
    except Exception as e:
        print(f"Error creating directory {save_path}: {e}")

Test

In [None]:
import os

def run_sanity_check(n_rows=2):
    print("=== AVVIO TEST DI FUNZIONAMENTO ===")

    # 1. Verifica Drive
    if not os.path.exists("/content/drive/MyDrive"):
        print("❌ ERRORE: Google Drive non montato. Esegui la cella di mount.")
        return
    else:
        print("✅ Google Drive: Connesso")

    # 2. Verifica Dataset originali
    first_ds_name = list(datasets.keys())[0]
    first_ds_path = datasets[first_ds_name]
    if not os.path.exists(first_ds_path):
        print(f"❌ ERRORE: Non trovo il file {first_ds_path}. Carica la cartella data_sampled.")
        return
    else:
        print(f"✅ File sorgente trovati ({first_ds_name})")

    # 3. Test calcolo su piccola scala
    print(f"\nTentativo di calcolo su {n_rows} righe...")
    test_df = load_dataset(first_ds_name).head(n_rows)

    try:
        # Testiamo una riga con Stanza
        sample_text = test_df['Simple'].iloc[0]
        print(f"Test Stanza... ", end="")
        doc_s = get_stanza_pipeline('en')(sample_text)
        _ = lexical_measures_from_doc(doc_s)
        print("OK")

        # Testiamo una riga con spaCy
        print(f"Test spaCy... ", end="")
        _ = lexical_cohesion_single(sample_text, spacy_nlp)
        print("OK")

        # 4. Test Scrittura su Drive
        test_file = os.path.join(DRIVE_FOLDER, "test_permessi.txt")
        with open(test_file, "w") as f:
            f.write("Test scrittura riuscito")
        print(f"✅ Test scrittura su Drive: OK")
        os.remove(test_file)

        print("\n=== TUTTO FUNZIONA CORRETTAMENTE ===")
        print("Puoi procedere con l'esecuzione della funzione principale.")

    except Exception as e:
        print(f"\n❌ ERRORE DURANTE IL TEST: {e}")
        print("Controlla le funzioni di calcolo o la connessione GPU.")

# Esecuzione del test
run_sanity_check()

In [None]:
import os
import pandas as pd
import numpy as np
from tqdm.auto import tqdm

# --- CONFIGURAZIONE DATASET ---
datasets = {
    #'ose_adv_ele': 'OSE_adv_ele.csv',
    #'ose_adv_int': 'OSE_adv_int.csv',
    #'swipe': 'swipe.csv',
    'vikidia': 'vikidia.csv'
}

# Percorso su Google Drive definito nel Passaggio 1
DRIVE_FOLDER = "/content/drive/MyDrive/Esame_Complexity"

def load_data(path):
    return pd.read_csv(path, sep='\t')

def load_dataset(name):
    if name not in datasets:
        raise ValueError(f"Dataset {name} not found")
    return load_data(datasets[name])

# --- BLOCCO DI COMPUTAZIONE AUTOMATIZZATO SU DRIVE ---

if __name__ == "__main__":
    text_columns = ['Simple', 'Complex']
    metrics_list = ["MTLD", "LD", "LS", "MDD", "CS", "LC", "CoH"]

    for nome_ds in datasets.keys():
        print(f"\n{'='*60}")
        print(f"ELABORAZIONE DATASET: {nome_ds.upper()}")

        # Percorsi file puntati su DRIVE
        checkpoint_path = os.path.join(DRIVE_FOLDER, f"checkpoint_{nome_ds}.csv")
        final_output_path = os.path.join(DRIVE_FOLDER, f"final_complexity_{nome_ds}.csv")

        # 1. Salto se già completato
        if os.path.exists(final_output_path):
            print(f"Risultato finale già presente su Drive. Salto al prossimo dataset.")
            continue

        # 2. Caricamento Dati o Ripristino Checkpoint da Drive
        if os.path.exists(checkpoint_path):
            print(f"Ripristino progresso dal file di Drive...")
            working_df = pd.read_csv(checkpoint_path, sep='\t')
        else:
            print(f"Nessun checkpoint. Caricamento file originale...")
            working_df = load_dataset(nome_ds)
            for t_col in text_columns:
                for m in metrics_list:
                    col_name = f"{t_col}_{m}"
                    if col_name not in working_df.columns:
                        working_df[col_name] = np.nan

        # 3. Loop di calcolo
        for t_col in text_columns:
            current_metric_cols = [f"{t_col}_{m}" for m in metrics_list]

            for idx, row in tqdm(working_df.iterrows(), total=len(working_df), desc=f"{nome_ds} ({t_col})"):

                # Se la riga è già calcolata nel checkpoint, salta
                if not working_df.loc[idx, current_metric_cols].isnull().any():
                    continue

                text = str(row[t_col])
                if not text.strip() or text == 'nan':
                    continue

                try:
                    # Analisi Stanza
                    doc_stanza = get_stanza_pipeline('en')(text)

                    lex = lexical_measures_from_doc(doc_stanza)
                    working_df.at[idx, f"{t_col}_MTLD"] = lex.get("MTLD")
                    working_df.at[idx, f"{t_col}_LD"]   = lex.get("LD")
                    working_df.at[idx, f"{t_col}_LS"]   = lex.get("LS")

                    syn = syntactic_measures_from_doc(doc_stanza)
                    working_df.at[idx, f"{t_col}_MDD"]  = syn.get("MDD")
                    working_df.at[idx, f"{t_col}_CS"]   = syn.get("CS")

                    # Analisi spaCy
                    working_df.at[idx, f"{t_col}_LC"] = lexical_cohesion_single(text, spacy_nlp)
                    working_df.at[idx, f"{t_col}_CoH"] = coherence_single(text, spacy_nlp)

                except Exception as e:
                    print(f"\n[ERRORE] Dataset {nome_ds}, Idx {idx}: {e}")

                # Salvataggio di sicurezza su DRIVE ogni 20 righe
                if idx % 20 == 0:
                    working_df.to_csv(checkpoint_path, sep='\t', index=False)

            # Salvataggio fine colonna
            working_df.to_csv(checkpoint_path, sep='\t', index=False)

        # 4. Finalizzazione e pulizia NaN
        print(f"Generazione file finale pulito per {nome_ds}...")
        all_metric_cols = [f"{tc}_{m}" for tc in text_columns for m in metrics_list]

        final_df = working_df.dropna(subset=all_metric_cols).copy()
        final_df = final_df[text_columns + all_metric_cols]

        # Salvataggio finale su DRIVE
        final_df.to_csv(final_output_path, sep='\t', index=False)
        print(f"✓ SUCCESSO: {final_output_path}")

    print("\n" + "="*60)
    print("TUTTI I DATASET SONO STATI SALVATI SUL TUO DRIVE!")
    print("="*60)