In [2]:
import numpy as np
import matplotlib.pyplot as plt
import scipy as sp
import math
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.model_selection import GridSearchCV
import pandas as pd
from itertools import combinations
import re
from sklearn.metrics import precision_score, make_scorer, accuracy_score

In [None]:
def non_linear_penalty(prob, confidence, sharpness=10):
    return 2 / (1 + math.exp(-sharpness * (prob - confidence)))

def word_score(scores):
    product = 1.0
    for s in scores:
        product *= max(s, 1e-6)
    return product ** (1.0 / len(scores)) if scores else 0

In [None]:
class SpeechClassifier_FR(BaseEstimator, ClassifierMixin):
    def __init__(self, p_threshold=0.5, w_threshold=0.5, confidence=0.5, sharpness=1.0):
        self.p_threshold = p_threshold
        self.w_threshold = w_threshold
        self.confidence = confidence
        self.sharpness = sharpness

    def fit(self, X, y):
        self.words_ = X.iloc[:, 0]
        self.targets_ = X.iloc[:, 1]
        self.deletions_ = X.iloc[:, 2]
        self.classes_ = np.unique(y)
        return self

    def predict(self, X):
        words_list = X.iloc[:, 0]
        targets_list = X.iloc[:, 1]
        deletions_list = X.iloc[:, 2] if X.shape[1] > 2 else [None] * len(X)

        predictions = []
        for words, target, deletion in zip(words_list, targets_list, deletions_list):
            result = self.classifier(target, words, deletion)
            predictions.append(int(result))
        return np.array(predictions)

    def classifier(self, target, words, deletion=None):
        score_target = len(target)
        for w in words:
            score_word = len(w)
            if score_word < score_target:
                continue
            for i in range(score_word - score_target + 1):
                Scores = []
                for pw, pt in zip(w[i:i + score_target], target):
                    if deletion is not None and i!=0:
                        if np.argmax(w[i-1]) == deletion:
                            break
                    s = 0
                    prob = pw[pt]
                    if prob > self.confidence:
                        s = 1
                    else:
                        s = non_linear_penalty(prob, sharpness=self.sharpness, confidence=self.confidence)

                    Scores.append(s)

                if word_score(Scores) > self.w_threshold:
                    for score in Scores:
                        if score < self.p_threshold:
                            break
                        return True
        return False
    

class SpeechClassifier_IT(BaseEstimator, ClassifierMixin):
    def __init__(self, p_threshold=0.5, w_threshold=0.5, confidence=0.5, sharpness=1.0):
        self.p_threshold = p_threshold
        self.w_threshold = w_threshold
        self.confidence = confidence
        self.sharpness = sharpness

    def fit(self, X, y):
        self.words_ = X.iloc[:, 0]
        self.targets_ = X.iloc[:, 1]
        self.classes_ = np.unique(y)
        return self

    def predict(self, X):
        words_list = X.iloc[:, 0]
        targets_list = X.iloc[:, 1]

        predictions = []
        for words, target in zip(words_list, targets_list):
            result = self.classifier(target, words)
            predictions.append(result)
        return np.array(predictions)


    def label(self, target, word):
        S_t = len(target)
        S_w = len(word)
        if S_w < S_t:
            return False
        for i in range(S_w - S_t + 1):
            scores = []
            for pw, pt in zip(word[i:i + S_t], target):
                s = 0
                prob = pw[pt]
                if prob > self.confidence:
                    s = 1
                else:
                    s = non_linear_penalty(prob, sharpness=self.sharpness, confidence=self.confidence)

                scores.append(s)

            if word_score(scores) > self.w_threshold:
                for score in scores:
                    if score < self.p_threshold:
                        break
                    return True
        return False
    
    def classifier(self,target,words):
        labels = []
        for i, word in enumerate(words):
            label = self.label(target[i], word)
            # For swaps, one element is classified as True and the other as False
            labels.append(label)
        return labels

In [5]:
def strings_to_position_tokens(strings):

    vocab = sorted(set("".join(strings)))
    vocab_dict = {char: idx for idx, char in enumerate(vocab)}
    vocab_size = len(vocab)

    result = []
    for s in strings:
        one_hot_string = []
        for char in s:
            one_hot = [0] * vocab_size
            one_hot[vocab_dict[char]] = 1
            one_hot_string.append(one_hot)
        result.append(one_hot_string)

    return result, vocab

def strings_to_char_positions(strings, vocab):
    vocab_dict = {char: idx for idx, char in enumerate(vocab)}
    return [
        [vocab_dict[char] for char in s if char in vocab_dict]
        for s in strings
    ]

def string_to_onehot_list(s, vocab_dict, vocab_size):
    return [[1 if i == vocab_dict[c] else 0 for i in range(vocab_size)] for c in s if c in vocab_dict]


def string_to_gaussian_encoding(s, vocab_dict, vocab_size, sigma=1.0):
    def gaussian_vector(center, size, sigma):
        x = np.arange(size)
        gaussian = np.exp(-0.5 * ((x - center) / sigma) ** 2)
        return (gaussian / np.max(gaussian)).tolist()

    return [
        gaussian_vector(vocab_dict[c], vocab_size, sigma)
        for c in s if c in vocab_dict
    ]


def get_fp_fn(y_true, y_pred):
    false_positives = [i for i, (yt, yp) in enumerate(zip(y_true, y_pred)) if yt == 0 and yp == 1]
    false_negatives = [i for i, (yt, yp) in enumerate(zip(y_true, y_pred)) if yt == 1 and yp == 0]
    return false_positives, false_negatives


#### <u>FR classifier test</u>

##### Real data

In [120]:
output_FR = pd.read_csv('phoneme_tokenizer\\output_FR.csv')
output_FR['probabilities'] = output_FR['probabilities'].apply(eval)
output_FR['timestamps'] = output_FR['timestamps'].apply(eval)
vocab = pd.read_json('phoneme_tokenizer\\vocab.json', typ='series').keys().tolist()

ground_truth_fr = pd.read_csv('1_Ground_truth\\Phoneme_Deleletion_ground_truth_FR.csv')[['file_name', 'API_target','accuracy_coder1']]
output_FR = pd.merge(output_FR, ground_truth_fr, left_on='file_name', right_on='file_name')
output_FR = output_FR[['probabilities','API_target','accuracy_coder1']]

output_FR['API_target'] = output_FR['API_target'].apply(lambda x: re.sub(r'[\[\]]', '', x))

In [121]:
output_FR

Unnamed: 0,probabilities,API_target,accuracy_coder1
0,"[[0.018307151272892952, 0.019523443654179573, ...",əval,1.0
1,"[[0.017722373828291893, 0.01870276965200901, 0...",ʁɔmaʒ,1.0


In [122]:
vocab

['ʒ',
 'ɹ',
 'j',
 'd',
 'ɲ',
 'ʌ',
 '[UNK]',
 'ɒ',
 'ɐ',
 'ʃ',
 'ɔ',
 'f',
 'ø',
 'z',
 'ŋ',
 'i',
 'u',
 '̃',
 'o',
 'œ',
 'a',
 '(',
 'ə',
 'ɜ',
 'ɾ',
 'ː',
 '̪',
 'e',
 'b',
 'ʁ',
 'w',
 'n',
 'p',
 'y',
 'ɡ',
 'ɪ',
 'r',
 'v',
 't',
 ')',
 'm',
 'k',
 'ʊ',
 'ʎ',
 'ɑ',
 's',
 'l',
 '[PAD]',
 'ɛ']

In [None]:
target = strings_to_char_positions(output_FR['API_target'].astype(str).tolist(), vocab)
target_series = pd.Series(target, name='target_tokens').reset_index(drop=True)

In [124]:
target_series

0       [22, 37, 20, 46]
1    [29, 10, 40, 20, 0]
Name: target_tokens, dtype: object

In [130]:
X = pd.concat([output_FR['probabilities'].apply(lambda x: [x]).reset_index(drop=True), target_series, pd.Series([None] * len(output_FR), name='third_column')], axis=1)
y = output_FR['accuracy_coder1'].reset_index(drop=True)

In [132]:
param_grid = {
    'p_threshold': np.arange(0.1, 1.0, 0.1),
    'w_threshold': np.arange(0.1, 1.0, 0.1),
    'confidence': np.arange(0.1, 1.0, 0.1),
    'sharpness': np.arange(1, 10, 1),
}

grid = GridSearchCV(SpeechClassifier_FR(), param_grid, scoring='accuracy',cv=2)
grid.fit(X, y)

In [133]:
grid.best_score_

1.0

##### Fictive data

In [None]:
df = pd.read_csv('Hackathon_ASR/1_Ground_truth/Phoneme_Deleletion_ground_truth_FR.csv')
phoneme = pd.read_csv('2_Phoneme_Deleletion_ground_truth_FR.csv')['trial_answer_coder1_phoneme']
df = df[['API_target', 'accuracy_coder1','phase']]
df['API_target'] = df['API_target'].apply(lambda x: re.sub(r'[\[\]]', '', x))
df = pd.concat([df, phoneme], axis=1)
df = df[df['phase'] == 'REPETITION'].drop(columns=['phase'])

FileNotFoundError: [Errno 2] No such file or directory: '2_Phoneme_Deleletion_ground_truth_FR.csv'

In [24]:
df

Unnamed: 0,API_target,accuracy_coder1,trial_answer_coder1_phoneme
18,plabiʁ,0.0,abiʁ
19,fʁɔmaʒ,0.0,omaʒ
20,baneɔ̃,0.0,animo
21,kɔ̃da,0.0,[PAD][UNK][PAD]
22,ʃəval,1.0,øval
...,...,...,...
5484,plak,1.0,peɛl[PAD]peɛl[PAD]lak
5485,pʁotal,1.0,peɛʁ[PAD]peɛʁ[PAD]pʁo[PAD]ʁotal
5486,spydjɔ,1.0,ɛs[PAD]pe[PAD]pydjo
5487,solysjɔ̃,1.0,olysjɔ̃


In [None]:
df_cleaned = df[~df.apply(lambda row: row.astype(str).str.contains(r"[{}]").any(), axis=1) & ~df['trial_answer_coder1_phoneme'].str.contains(r"\[UNK\]")].copy()

all_strings = df_cleaned['API_target'].astype(str).tolist() + df_cleaned['trial_answer_coder1_phoneme'].astype(str).tolist()
_, vocab = strings_to_position_tokens(all_strings)

vocab_dict = {char: i for i, char in enumerate(vocab)}
vocab_size = len(vocab)

df_cleaned['answered'] = df_cleaned['trial_answer_coder1_phoneme'].astype(str).apply(lambda s: string_to_gaussian_encoding(s, vocab_dict, vocab_size))
target = strings_to_char_positions(df_cleaned['API_target'].astype(str).tolist(), vocab)

target_series = pd.Series(target, name='target_tokens')
df_cleaned_reset = df_cleaned.reset_index(drop=True)
target_series_reset = target_series.reset_index(drop=True)

X = pd.concat([df_cleaned_reset['answered'], target_series_reset, pd.Series([None] * len(df_cleaned_reset), name='third_column')], axis=1)

df_cleaned_reset['accuracy_coder1'] = df_cleaned_reset['accuracy_coder1'].fillna(0).astype(int)
y = df_cleaned_reset['accuracy_coder1'].apply(lambda x: int(x)).values

In [52]:
X['answered'] = X['answered'].apply(lambda x: [x])

In [53]:
X.iloc[:,0]

0       [[[2.289734845645553e-11, 1.522997974471263e-0...
1       [[[4.408531331463226e-71, 1.7556880978548265e-...
2       [[[2.289734845645553e-11, 1.522997974471263e-0...
3       [[[2.394254760948756e-183, 5.709040105864101e-...
4       [[[4.408531331463226e-71, 1.7556880978548265e-...
                              ...                        
2239    [[[4.0723586257611754e-79, 4.408531331463226e-...
2240    [[[4.0723586257611754e-79, 4.408531331463226e-...
2241    [[[3.7772499723621244e-282, 9.87710872151989e-...
2242    [[[4.408531331463226e-71, 1.7556880978548265e-...
2243    [[[2.7487850079102147e-43, 2.005008781961654e-...
Name: answered, Length: 2244, dtype: object

In [54]:
param_grid = {
    'p_threshold': np.arange(0.1, 1.0, 0.1),
    'w_threshold': np.arange(0.1, 1.0, 0.1),
    'confidence': np.arange(0.1, 1.0, 0.1),
    'sharpness': np.arange(1, 10, 1),
}

grid = GridSearchCV(SpeechClassifier_FR(), param_grid, scoring='accuracy', cv=4)
grid.fit(X, y)

In [56]:
grid.best_score_

0.7196969696969696

In [45]:
fp, fn = get_fp_fn(y, grid.predict(X))

In [49]:
len(fn)

556

In [48]:
len(fp)

77

In [368]:
pred = grid.predict(X)

In [None]:
fp, fn = get_fp_fn(y, pred)

In [370]:
df_cleaned['trial_answer_coder2'].reset_index(drop=True)[fn]

3                   onda
4                  euval
8                  anari
10                  ikan
11                   ato
              ...       
2216            am améra
2218    pr pro pro rotal
2219           s p pudio
2220             olucion
2221           k aravane
Name: trial_answer_coder2, Length: 1106, dtype: object

In [371]:
df_cleaned['trial_answer_coder2'].reset_index(drop=True)[fp]

69        glotte otre otte
91      gloire gloire oire
121           gatine itine
497                églotte
612               éfrisson
1327               ispalio
1502                aplage
2083                glotus
Name: trial_answer_coder2, dtype: object

In [372]:
grid.best_score_

0.9829645567904581

#### <u> IT classifier test </u>

In [76]:
def clean_and_split(text):
    if pd.isna(text):
        return []

    cleaned = re.sub(r'{.*?}', '', text)
    cleaned = re.sub(r'\[.*?\]', '', cleaned)

    cleaned = re.sub(r'[^a-zA-Zàèò().\s]', '', cleaned)

    cleaned = cleaned.strip()

    return cleaned.split()        

df_it = pd.read_csv("1_Ground_truth/Decoding_ground_truth_IT.csv")
df_it['trial_answer_coder1'] = df_it['trial_answer_coder1'].apply(clean_and_split)
df_it = df_it[df_it['trial_answer_coder1'].apply(len) == 12]
config = df_it["config"].apply(lambda x: x.strip(";").split(";"))
accuracy_coder1 = df_it["accuracy_coder1"].apply(lambda x: [int(i) if i != 'NA' else 0 for i in x.split(" ")])
trial_answer_coder1 = df_it["trial_answer_coder1"]

In [77]:
all_strings = [item for sublist in config.tolist() for item in sublist] + \
			  [item for sublist in trial_answer_coder1.tolist() for item in sublist]
_, vocab = strings_to_position_tokens(all_strings)

vocab_dict = {char: i for i, char in enumerate(vocab)}
vocab_size = len(vocab)

def string_to_gaussian_encoding(s, vocab_dict, vocab_size, sigma=1.0):
    def gaussian_vector(center, size, sigma):
        x = np.arange(size)
        gaussian = np.exp(-0.5 * ((x - center) / sigma) ** 2)
        return (gaussian / np.max(gaussian)).tolist()

    encoding = []
    for c in s:
        if c in vocab_dict:
            encoding.append(gaussian_vector(vocab_dict[c], vocab_size, sigma))
    return encoding

answers = []
for trial in trial_answer_coder1.tolist():
    trial_encodings = []
    for s in trial:
        if isinstance(s, str):
            encoded = string_to_gaussian_encoding(s, vocab_dict, vocab_size)
            trial_encodings.append(encoded)
    answers.append(trial_encodings)
targets = [strings_to_char_positions(c, vocab) for c in config]

targets = pd.Series(targets, name='target_tokens').reset_index(drop=True)
answers = pd.Series(answers).reset_index(drop=True)

In [80]:
X = pd.concat([answers, targets], axis=1)
y = accuracy_coder1.tolist()

In [103]:
param_grid = {
    'p_threshold': np.arange(0.1, 1.0, 0.2),
    'w_threshold': np.arange(0.1, 1.0, 0.2),
    'confidence': np.arange(0.1, 1.0, 0.2),
    'sharpness': np.arange(1, 10, 2),
}

custom_scorer = make_scorer(precision_score, average='macro')

grid = GridSearchCV(SpeechClassifier_IT(), param_grid, scoring='accuracy', cv=4)
grid.fit(X, y)

In [105]:
grid.best_score_, grid.best_params_

(0.4702819272038712,
 {'confidence': 0.7000000000000001,
  'p_threshold': 0.9000000000000001,
  'sharpness': 3,
  'w_threshold': 0.1})