In [None]:
#Libraries
import pandas as pd
import numpy as np
import re
import unicodedata
import string
import nltk
import spacy
from nltk.corpus import stopwords
import tensorflow as tf

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, roc_curve, auc, roc_auc_score
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.utils import to_categorical

from collections import Counter
import random
import nltk
from nltk.corpus import wordnet
from functools import lru_cache

from nltk.tokenize import word_tokenize


from sklearn.utils.class_weight import compute_class_weight
from sklearn.preprocessing import LabelEncoder
from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import LSTM, Dense

from sklearn.metrics import confusion_matrix, classification_report

from functools import lru_cache



In [None]:
# Load Italian language model (Lemmatization)
nlp = spacy.load("it_core_news_sm")
nlp.max_length = 2000000  # Increase max length for large documents

# Define stopwords for Italian
stop_words_ntlk = set(stopwords.words('italian'))
custom_stopwords = set(["quindi", "pertanto", "dunque", "deve essere", "ogni caso", "devono essere", "esempio", "infatti", "invece", "cioè", "tuttavia", "perché", "solo", "sempre", "così", "riguardo", "ovvero", "però", "comunque", "ancora", 
                        "peraltro", "stesso", "tanto", "poiché", "mentre", "essa", "inoltre", "punto", "quali", "stessa", "proprio", "esso", "perche", "cosi", "cioe", "deve", "essere", "devono", "poiche", "possono", "può", "puo", "poiche", "pero", "senso",
                        "particolare", "oltre", "basta", "secondo", "rispetto", "infine", "soltanto", "detto", "caso", "meno", "ragione", "quando", "basta", "nonche", "volta", "meno", "ossia", "tale", "tratta",
                        "gia", "pur", "tal", "poi","puo","cio"])
stop_words = set(stop_words_ntlk).union(custom_stopwords)

def clean_text(text):
    # Convert to lowercase
    text = text.lower()
    # Normalize unicode characters
    text = unicodedata.normalize('NFKD', text).encode('ASCII', 'ignore').decode('ASCII')
    # Remove URLs
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
    # Remove email addresses
    text = re.sub(r'\S+@\S+', '', text)
    # Remove legal references (examples: art. 123, d.lgs. 50/2016)
    text = re.sub(r'\b(?:art\.|d\.lgs\.|l\.) \d+(?:/\d+)?', '', text)
    # Remove new lines and tabs
    text = re.sub(r'[\n\t]+', ' ', text)
    # Remove excessive whitespace
    text = re.sub(r'\s+', ' ', text)
    # Remove words containing '.' and '_'
    text = ' '.join([word for word in text.split() if '.' not in word and '_' not in word])
    # Remove hyphens and underscores
    text = re.sub(r'[-_]', '', text)
    # Remove numbers
    text = re.sub(r'\d+', '', text)
    # Remove punctuation and non-ASCII characters
    text = re.sub(r'[^a-z\s]', '', text)
    # Remove very long words (likely errors or concatenated text)
    text = ' '.join([word for word in text.split() if len(word) <= 20])

    return text.strip()

def remove_stopwords(text):
    return ' '.join([word for word in text.split() if word not in stop_words])

def lemmatize_text(text):
    doc = nlp(text)
    return " ".join([token.lemma_ for token in doc])

def tokenize_text(text):
    return [token.text for token in nlp(text)]

# Load the data
data = pd.read_parquet('SpecialisticMonotopic.parquet', engine='pyarrow')

# Clean and lemmatize the text data
data = data.dropna(subset=['topic', 'text'])
data['topic'] = data['topic'].apply(clean_text)
data['text'] = data['text'].apply(lambda x: lemmatize_text(remove_stopwords(clean_text(x))))

# Remove words with less than 4 characters
data['text'] = data['text'].apply(lambda x: re.sub(r'\b\w{1,3}\b', '', x))

data = data.drop_duplicates(subset=['topic', 'text']).reset_index(drop=True)

# Remove texts with less than 10 words
data = data[data['text'].apply(lambda x: len(x.split()) >= 10)]

# Save cleaned data
data.to_csv('SpecialisticMonotopic_cleaned_Lemmatized.csv', index=False)

print("Data cleaning, stopwords removal, and lemmatization completed.")


In [None]:
# Load the pre-cleaned data
data = pd.read_csv('SpecialisticMonotopic_cleaned_Lemmatized.csv')

# Check the first few rows of the dataframe
data.head()

In [None]:
# Get unique topics
unique_topics = data['topic'].unique()

# Print unique topics
print("Unique topics:")
print(unique_topics)

# Optionally, you can also get the number of unique topics
num_unique_topics = len(unique_topics)
print(f"\nNumber of unique topics: {num_unique_topics}")


In [None]:
# Topics distribution

# Count the number of texts for each topic
topic_counts = data['topic'].value_counts()

# Replace spaces with underscores in topics
#data['topic'] = data['topic'].str.replace(' ', '_')

# Create a bar plot
plt.figure(figsize=(12, 6))
sns.barplot(x=topic_counts.index, y=topic_counts.values)

# Rotate x-axis labels for better readability
plt.xticks(rotation=90, ha='right')

# Add labels and title
plt.xlabel('Topics')
plt.ylabel('Number of Texts')
plt.title('Distribution of Texts Across Topics')

# Adjust layout to prevent cutting off labels
plt.tight_layout()

# Show the plot
plt.show()

In [None]:
# Download required NLTK data
nltk.download('omw-1.4')
nltk.download('wordnet')

# Split data into training and testing sets
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42, stratify=data['topic'])

@lru_cache(maxsize=1000)
def get_italian_synonyms(word):
    synonyms = []
    for syn in wordnet.synsets(word, lang='ita'):
        for lemma in syn.lemmas(lang='ita'):
            if lemma.name() != word:
                synonyms.append(lemma.name())
    return list(set(synonyms))  # Remove duplicates

def get_synonym(word):
    synonyms = get_italian_synonyms(word)
    if synonyms:
        synonym = random.choice(synonyms)
        print(f"Original word: '{word}' | Synonym: '{synonym}'")
        return synonym
    print(f"No synonym found for: '{word}'")
    return word

def replace_frequent_words(text, frequent_words):
    # Tokenize the text
    words = text.split()
    
    # Replace frequent words with synonyms
    for word in frequent_words:
        synonym = get_synonym(word)
        if synonym != word:
            text = text.replace(word, synonym)
    
    return text

# Group texts by topic
grouped_texts = train_data.groupby('topic')['text'].apply(lambda x: ' '.join(x)).reset_index()

# Get the 100 most common words for each topic
topic_frequent_words = {}
for _, row in grouped_texts.iterrows():
    topic = row['topic']
    text = row['text']
    word_counts = Counter(text.split())
    most_frequent = [word for word, _ in word_counts.most_common(100)]
    topic_frequent_words[topic] = most_frequent

# Replace the frequent words with synonyms in the original texts
train_data['enriched_text'] = train_data.apply(lambda row: replace_frequent_words(row['text'], topic_frequent_words[row['topic']]), axis=1)

# Display a sample of original and enriched text
print("\nSample of original and enriched text:")
print(train_data[['text', 'enriched_text']].head())

print("\nEnrichment process completed.")
train_data.to_csv('Enriched_Data_Topics.csv', index=False)

In [None]:
# Tokenizer 
# Download NLTK data files (if not already downloaded)
nltk.download('punkt')

# Assuming 'cleaned_data' is the DataFrame with the cleaned, lemmatized, and synonym replaced text
def tokenize_text(text):
    return word_tokenize(text)

# Apply the tokenization function to the 'enriched_text' column
train_data['enirched_text'] = train_data['enriched_text'].apply(tokenize_text)

# Apply the tokenization function to the 'enriched_text' column
test_data['text'] = test_data['text'].apply(tokenize_text)

# Display a sample of the tokenized text
print("\nSample of tokenized text:")
print(train_data['enriched_text'].head())

In [None]:

# Set random seeds for reproducibility
seed = 42
random.seed(seed)
np.random.seed(seed)
tf.random.set_seed(seed)

# 'train_data' is the DataFrame with enriched texts
texts_train = train_data['enriched_text'].values
labels_train = train_data['topic'].values

# Calculate class weights
unique_labels = np.unique(labels_train)
class_weights = compute_class_weight(class_weight='balanced', classes=unique_labels, y=labels_train)
class_weights_dict = {i: class_weights[i] for i in range(len(unique_labels))}

# Encode labels
label_encoder = LabelEncoder()
encoded_labels_train = label_encoder.fit_transform(labels_train)
categorical_labels_train = to_categorical(encoded_labels_train)

# Convert texts to TF-IDF vectors
tfidf_vectorizer = TfidfVectorizer(max_features=10000)
X_train_tfidf = tfidf_vectorizer.fit_transform(texts_train)

# Apply TruncatedSVD to reduce dimensions
n_components = 300  # Adjust the number of components as needed
svd = TruncatedSVD(n_components=n_components, random_state=seed)
X_train_reduced = svd.fit_transform(X_train_tfidf)

# Reshape the data to fit LSTM input requirements
X_train_lstm = X_train_reduced.reshape((X_train_reduced.shape[0], 1, X_train_reduced.shape[1]))

# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_train_lstm, categorical_labels_train, test_size=0.2, random_state=seed)

# Build the LSTM model
model = Sequential()
model.add(LSTM(units=128, input_shape=(X_train.shape[1], X_train.shape[2]), dropout=0.3, recurrent_dropout=0.3))
model.add(Dense(units=len(label_encoder.classes_), activation='softmax'))

# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Train the model
history = model.fit(X_train, y_train, epochs=10, batch_size=16, validation_data=(X_val, y_val), class_weight=class_weights_dict)

# Evaluate the model on the validation data
loss, accuracy = model.evaluate(X_val, y_val)
print(f"Validation Loss: {loss:.4f}")
print(f"Validation Accuracy: {accuracy:.4f}")

# Plot training and validation accuracy
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

# Plot training and validation loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()


In [None]:

# Function to truncate labels to a maximum of 3 words
def truncate_label(label, max_words=3):
    return ' '.join(label.split()[:max_words])

# Truncate the labels
truncated_labels = [truncate_label(label) for label in label_encoder.classes_]

# Predict on validation data
y_val_pred = model.predict(X_val)
y_val_pred_classes = np.argmax(y_val_pred, axis=1)
y_val_true_classes = np.argmax(y_val, axis=1)

# Compute confusion matrix
conf_matrix = confusion_matrix(y_val_true_classes, y_val_pred_classes)

# Normalize confusion matrix by row (true labels)
conf_matrix_normalized = conf_matrix.astype('float') / conf_matrix.sum(axis=1)[:, np.newaxis] * 100

# Plot confusion matrix
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix_normalized, annot=False, fmt='.2f', cmap='Blues', xticklabels=truncated_labels, yticklabels=truncated_labels)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix (Percentage)')
plt.show()

# Print classification report
class_report = classification_report(y_val_true_classes, y_val_pred_classes, target_names=truncated_labels)
print("Classification Report:\n", class_report)


In [None]:
labels = data['topic'].values
class_names = label_encoder.classes_

# Print misclassifications with percentages
misclassifications = []
print("\nMisclassifications:")
for i, actual_class in enumerate(labels):
    for j, predicted_class in enumerate(labels):
        if i != j and i < conf_matrix.shape[0] and j < conf_matrix.shape[1] and conf_matrix[i, j] > 0:
            count = conf_matrix[i, j]
            percentage = conf_matrix_normalized[i, j]
            misclassifications.append((percentage, actual_class, predicted_class, count))
            print(f"Actual: {actual_class}, Predicted: {predicted_class}, Count: {count}, Percentage: {percentage:.2f}%")

# Sort and print the top 5 highest percentages of misclassifications
misclassifications.sort(reverse=True, key=lambda x: x[0])
top_5_misclassifications = misclassifications[:5]

print("\nTop 5 Highest Misclassification Percentages:")
for percentage, actual_class, predicted_class, count in top_5_misclassifications:
    print(f"Actual class '{actual_class}' misclassified as '{predicted_class}': {count} times ({percentage:.2f}%)")

In [None]:
data = pd.read_csv('SpecialisticMonotopic_cleaned_Lemmatized.csv')

In [None]:
# Using the topics you provided earlier

# Direct replacement of topics with macro topics
data['topic'] = data['topic'].replace({
    'assicurazione contro i danni': 'Assicurazione',
    'assicurazione obbligatoria rca': 'Assicurazione',
    'contratto di assicurazione': 'Assicurazione',
    'class action azione di classe': 'Procedimenti Legali',
    'danno da irragionevole durata del processo o equa riparazione': 'Procedimenti Legali',
    'lite temeraria': 'Procedimenti Legali',
    'danni punitivi': 'Danni Specifici',
    'danno biologico': 'Danni Specifici',
    'danno erariale': 'Danni Specifici',
    'danno esistenziale': 'Danni Specifici',
    'danno da immissioni': 'Danni Specifici',
    'danno morale': 'Danni Specifici',
    'danno da morte dei congiunti': 'Danni Specifici',
    'danno patrimoniale': 'Danni Specifici',
    'danno da perdita di chance': 'Danni Specifici',
    'danno alla persona': 'Danni Specifici',
    'danno da reato': 'Danni Specifici',
    'danno da svalutazione monetaria': 'Danni Specifici',
    'danno da vacanza rovinata': 'Danni Specifici',
    'danno ai veicoli': 'Danni Specifici',
    'danno non patrimoniale': 'Danni Specifici',
    'ingiusta detenzione': 'Violazioni Diritti Personali',
    'passeggero terzo trasportato terzi trasportati': 'Violazioni Diritti Personali',
    'responsabilita contrattuale': 'Responsabilità',
    'responsabilita civile dei magistrati': 'Responsabilità',
    'responsabilita dei genitori': 'Responsabilità',
    'responsabilita extracontrattuale': 'Responsabilità',
    'responsabilita dei padroni e committenti': 'Responsabilità',
    'responsabilita per danni da circolazione stradale': 'Responsabilità',
    'responsabilita professionale': 'Responsabilità',
    'responsabilita medica': 'Responsabilità',
    'responsabilita della pa': 'Responsabilità',
    'responsabilita del produttore danno da prodotto difettoso': 'Responsabilità',
    'responsabilita per danno da cose in custodia': 'Responsabilità'
})

print("Topic replacement completed.")
print(data)

# Distribution of macro topics
print("\nDistribution of macro topics:")
print(data['topic'].value_counts())

# Check for any unchanged topics
print("\nUnchanged topics:")
print(data[~data['topic'].isin(['Assicurazione', 'Procedimenti Legali', 'Danni Specifici', 'Violazioni Diritti Personali', 'Responsabilità'])]['topic'].unique())

In [None]:
# Download required NLTK data
nltk.download('omw-1.4')
nltk.download('wordnet')

# Split data into training and testing sets
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42, stratify=data['topic'])

@lru_cache(maxsize=1000)
def get_italian_synonyms(word):
    synonyms = []
    for syn in wordnet.synsets(word, lang='ita'):
        for lemma in syn.lemmas(lang='ita'):
            if lemma.name() != word:
                synonyms.append(lemma.name())
    return list(set(synonyms))  # Remove duplicates

def get_synonym(word):
    synonyms = get_italian_synonyms(word)
    if synonyms:
        synonym = random.choice(synonyms)
        print(f"Original word: '{word}' | Synonym: '{synonym}'")
        return synonym
    print(f"No synonym found for: '{word}'")
    return word

def replace_frequent_words(text, frequent_words):
    # Tokenize the text
    words = text.split()
    
    # Replace frequent words with synonyms
    for word in frequent_words:
        synonym = get_synonym(word)
        if synonym != word:
            text = text.replace(word, synonym)
    
    return text

# Group texts by topic
grouped_texts = train_data.groupby('topic')['text'].apply(lambda x: ' '.join(x)).reset_index()

# Get the 100 most common words for each topic
topic_frequent_words = {}
for _, row in grouped_texts.iterrows():
    topic = row['topic']
    text = row['text']
    word_counts = Counter(text.split())
    most_frequent = [word for word, _ in word_counts.most_common(100)]
    topic_frequent_words[topic] = most_frequent

# Replace the frequent words with synonyms in the original texts
train_data['enriched_text'] = train_data.apply(lambda row: replace_frequent_words(row['text'], topic_frequent_words[row['topic']]), axis=1)

# Display a sample of original and enriched text
print("\nSample of original and enriched text:")
print(train_data[['text', 'enriched_text']].head())

print("\nEnrichment process completed.")
train_data.to_csv('Enriched_Data_Macro_Topics.csv', index=False)

In [None]:
# Tokenizer 
# Download NLTK data files (if not already downloaded)
nltk.download('punkt')

# Assuming 'cleaned_data' is the DataFrame with the cleaned, lemmatized, and synonym replaced text
def tokenize_text(text):
    return word_tokenize(text)

# Apply the tokenization function to the 'enriched_text' column
train_data['enirched_text'] = train_data['enriched_text'].apply(tokenize_text)

# Apply the tokenization function to the 'enriched_text' column
test_data['text'] = test_data['text'].apply(tokenize_text)

# Display a sample of the tokenized text
print("\nSample of tokenized text:")
print(train_data['enriched_text'].head())

In [None]:

# Set random seeds for reproducibility
seed = 42
random.seed(seed)
np.random.seed(seed)
tf.random.set_seed(seed)

# 'train_data' is the DataFrame with enriched texts
texts_train = train_data['enriched_text'].values
labels_train = train_data['topic'].values

# Calculate class weights
unique_labels = np.unique(labels_train)
class_weights = compute_class_weight(class_weight='balanced', classes=unique_labels, y=labels_train)
class_weights_dict = {i: class_weights[i] for i in range(len(unique_labels))}

# Encode labels
label_encoder = LabelEncoder()
encoded_labels_train = label_encoder.fit_transform(labels_train)
categorical_labels_train = to_categorical(encoded_labels_train)

# Convert texts to TF-IDF vectors
tfidf_vectorizer = TfidfVectorizer(max_features=10000)
X_train_tfidf = tfidf_vectorizer.fit_transform(texts_train)

# Apply TruncatedSVD to reduce dimensions
n_components = 300  # Adjust the number of components as needed
svd = TruncatedSVD(n_components=n_components, random_state=seed)
X_train_reduced = svd.fit_transform(X_train_tfidf)

# Reshape the data to fit LSTM input requirements
X_train_lstm = X_train_reduced.reshape((X_train_reduced.shape[0], 1, X_train_reduced.shape[1]))

# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_train_lstm, categorical_labels_train, test_size=0.2, random_state=seed)

# Build the LSTM model
model = Sequential()
model.add(LSTM(units=128, input_shape=(X_train.shape[1], X_train.shape[2]), dropout=0.1, recurrent_dropout=0.1))
model.add(Dense(units=len(label_encoder.classes_), activation='softmax'))

# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Train the model
history = model.fit(X_train, y_train, epochs=10, batch_size=16, validation_data=(X_val, y_val), class_weight=class_weights_dict)

# Evaluate the model on the validation data
loss, accuracy = model.evaluate(X_val, y_val)
print(f"Validation Loss: {loss:.4f}")
print(f"Validation Accuracy: {accuracy:.4f}")

# Plot training and validation accuracy
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

# Plot training and validation loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()


In [None]:

# Function to truncate labels to a maximum of 3 words
def truncate_label(label, max_words=3):
    return ' '.join(label.split()[:max_words])

# Truncate the labels
truncated_labels = [truncate_label(label) for label in label_encoder.classes_]

# Predict on validation data
y_val_pred = model.predict(X_val)
y_val_pred_classes = np.argmax(y_val_pred, axis=1)
y_val_true_classes = np.argmax(y_val, axis=1)

# Compute confusion matrix
conf_matrix = confusion_matrix(y_val_true_classes, y_val_pred_classes)

# Normalize confusion matrix by row (true labels)
conf_matrix_normalized = conf_matrix.astype('float') / conf_matrix.sum(axis=1)[:, np.newaxis] * 100

# Plot confusion matrix
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix_normalized, annot=False, fmt='.2f', cmap='Blues', xticklabels=truncated_labels, yticklabels=truncated_labels)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix (Percentage)')
plt.show()

# Print classification report
class_report = classification_report(y_val_true_classes, y_val_pred_classes, target_names=truncated_labels)
print("Classification Report:\n", class_report)


In [None]:
labels = data['topic'].values
class_names = label_encoder.classes_

# Print misclassifications with percentages
misclassifications = []
print("\nMisclassifications:")
for i, actual_class in enumerate(labels):
    for j, predicted_class in enumerate(labels):
        if i != j and i < conf_matrix.shape[0] and j < conf_matrix.shape[1] and conf_matrix[i, j] > 0:
            count = conf_matrix[i, j]
            percentage = conf_matrix_normalized[i, j]
            misclassifications.append((percentage, actual_class, predicted_class, count))
            print(f"Actual: {actual_class}, Predicted: {predicted_class}, Count: {count}, Percentage: {percentage:.2f}%")

# Sort and print the top 5 highest percentages of misclassifications
misclassifications.sort(reverse=True, key=lambda x: x[0])
top_3_misclassifications = misclassifications[:3]

print("\nTop 3 Highest Misclassification Percentages:")
for percentage, actual_class, predicted_class, count in top_3_misclassifications:
    print(f"Actual class '{actual_class}' misclassified as '{predicted_class}': {count} times ({percentage:.2f}%)")