In [None]:
import os
import config
from config import logger
import pandas as pd
from sklearn.utils import resample
from matplotlib import pyplot as plt


def fetch_audio_files(path):
    audio_files = []
    for root, dirs, files in os.walk(path):
        for f in sorted(files):
            if f.endswith('.wav'):
                audio_files.append(os.path.join(root, f))
    logger.info(f"Successfully fetched {len(audio_files)} (.wav) audio files!")
    return audio_files


def get_user_input(prompt, choices):
    while True:
        user_input = input(prompt)
        if user_input.lower() in choices:
            return user_input.lower()
        else:
            logger.info('Invalid input. Please try again.')


def df_to_csv(df, file_path):
    df.to_csv(file_path, index=False)
    logger.info(f"Writing {file_path}...")


def add_train_scores(df):
    text_data = df
    logger.debug(text_data)
    scores_df = pd.read_csv(config.diagnosis_train_scores)
    scores_df = scores_df.rename(columns={'adressfname': 'addressfname', 'dx': 'diagnosis'})
    scores_df = binarize_labels(scores_df)
    logger.debug(scores_df)
    output = pd.merge(text_data,
                      scores_df[['addressfname', 'mmse', 'diagnosis']],  # We don't want the key column here
                      on='addressfname',
                      how='inner')

    logger.debug(output)
    return output


def binarize_labels(df):
    df['diagnosis'] = [1 if label == 'ad' else 0 for label in df['diagnosis']]
    df_majority = df[df['diagnosis'] == 1]  
    df_minority = df[df['diagnosis'] == 0]  
    df_majority_downsampled = resample(df_majority,
                                       replace=False,  
                                       n_samples=len(df_minority), 
                                       random_state=42)  

    df_downsampled = pd.concat([df_majority_downsampled, df_minority])

    plt.show()
    return df_downsampled


In [None]:
import logging


class Formatter(logging.Formatter):
    def format(self, record):
        if record.levelno == logging.INFO:
            self._style._fmt = "%(message)s"
        else:
            color = {
                logging.WARNING: 33,
                logging.ERROR: 31,
                logging.FATAL: 31,
                logging.DEBUG: 36
            }.get(record.levelno, 0)
            self._style._fmt = f"\033[{color}m%(levelname)s [%(filename)s:%(lineno)d]\033[0m: %(message)s"
        return super().format(record) # type: ignore

In [None]:
import config
from config import logger
from utils import fetch_audio_files, df_to_csv, add_train_scores
from pathlib import Path
import whisper
import os
import codecs
import pandas as pd


def transcribe():
    whisper_model = whisper.load_model(config.whisper_model_name)

    logger.info("Initiating transcription...")

    diagnosis_train_audio_files = fetch_audio_files(config.diagnosis_train_data)
    logger.debug(diagnosis_train_audio_files)
    diagnosis_test_audio_files = fetch_audio_files(config.diagnosis_test_data)
    logger.debug(diagnosis_test_audio_files)

    write_transcription(diagnosis_train_audio_files, config.diagnosis_train_transcription_dir, whisper_model)
    write_transcription(diagnosis_test_audio_files, config.diagnosis_test_transcription_dir, whisper_model)

    train_df = transcription_to_df(config.diagnosis_train_transcription_dir)
    train_df = add_train_scores(train_df)

    test_df = transcription_to_df(config.diagnosis_test_transcription_dir)

    df_to_csv(train_df, config.train_scraped_path)
    df_to_csv(test_df, config.test_scraped_path)

    logger.info("Transcription done.")


def write_transcription(audio_files, transcription_dir, whisper_model):
    for audio_file in audio_files:
        filename = Path(audio_file).stem
        transcription_file = (transcription_dir / filename).resolve()

        if not transcription_file.exists():
            result = whisper_model.transcribe(audio_file, fp16=False)
            transcription_str = str(result["text"])

            transcription_file.parent.mkdir(parents=True, exist_ok=True)

            transcription_file.write_text(transcription_str)
            logger.info(f"Transcribed {transcription_file}...")


def transcription_to_df(data_dir):
    texts = []

    for root, dirs, files in os.walk(data_dir):
        for file in files:
            with codecs.open(os.path.join(root, file), 'r', encoding='utf-8', errors='ignore') as f:
                text = f.read()
                texts.append((file, text))

    df = pd.DataFrame(texts, columns=['addressfname', 'transcript'])
    df['transcript'] = df['transcript'].str.replace('\n', ' ').replace('\\n', ' ').replace('  ', ' ')
    df = df.sort_values(by='addressfname')
    df = df.reset_index(drop=True)
    logger.debug(df)

    return df


In [None]:
import logging
import os
import sys
import typing
from pathlib import Path
from formatter import Formatter
import tiktoken

logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(Formatter())
logger.setLevel(logging.INFO)
logger.addHandler(handler)

dirname = Path(__file__).parent.resolve()

whisper_model_name = None
whisper_model = None

max_tokens = 500
embedding_engine = 'text-embedding-ada-002'

n_splits = 10

data_dir = (dirname / "ADReSSo").resolve()

diagnosis_train_data = (
        data_dir / "diagnosis-train" / "diagnosis" / "train" / "audio").resolve()  # Dementia and control group
diagnosis_test_data = (
        data_dir / "diagnosis-test" / "diagnosis" / "test-dist" / "audio").resolve()  # Test data
diagnosis_train_scores = (
        data_dir / "diagnosis-train" / "diagnosis" / "train" / "adresso-train-mmse-scores.csv").resolve()
empty_test_results_file = (data_dir / "diagnosis-test" / "diagnosis" / "test-dist" / "test_results_task1.csv").resolve()
test_results_task1 = (data_dir / "task1.csv").resolve()


decline_data = (
        data_dir / "progression-train" / "progression" / "train" / "audio" / "decline").resolve()  
no_decline_data = (
        data_dir / "progression-train" / "progression" / "train" / "audio" / "no_decline").resolve()  


transcription_dir = (dirname / "processed" / "transcription").resolve()
diagnosis_train_transcription_dir = (transcription_dir / "train").resolve()
diagnosis_test_transcription_dir = (transcription_dir / "test").resolve()
train_scraped_path = (dirname / "processed" / "train_scraped.csv").resolve()
test_scraped_path = (dirname / "processed" / "test_scraped.csv").resolve()
train_embeddings_path = (dirname / "processed" / "train_embeddings.csv").resolve()
test_embeddings_path = (dirname / "processed" / "test_embeddings.csv").resolve()

embedding_results_dir = (dirname / "results" / "embedding").resolve()
models_size_file = (embedding_results_dir / 'embedding_models_size.csv').resolve()


def set_up():
    logger.info("Loading cl100k_base tokenizer...")
    logger.info(f"Max tokens per embedding: {max_tokens}.")
    tokenizer = tiktoken.get_encoding("cl100k_base")
    logger.info(f"Loading GPT embedding engine {embedding_engine}...")

    Path(dirname / "processed").resolve().mkdir(exist_ok=True)
    Path(dirname / "results").resolve().mkdir(exist_ok=True)
    embedding_results_dir.mkdir(exist_ok=True)
    Path(embedding_results_dir / 'plots').resolve().mkdir(exist_ok=True)

    return tokenizer


def secret_key() -> typing.Optional[str]:
    value = os.environ.get('OPENAI_API_KEY', None)

    if not value:
        logger.warning("Optional environment variable 'OPENAI_API_KEY' is missing.")

    return value


In [None]:
from matplotlib import pyplot as plt
import config
from config import logger
import pandas as pd
import openai


def tokenization(df, tokenizer):
    df_columns = df.columns
    df['n_tokens'] = df['transcript'].apply(lambda x: len(tokenizer.encode(x)))
    df['n_tokens'].hist()
    plt.show()

    def split_into_many(text, max_tokens):
        sentences = text.split('. ')

        chunks = []
        tokens_so_far = 0
        chunk = []

        for sentence in sentences:
            token = len(tokenizer.encode(" " + sentence))

            if tokens_so_far + token > max_tokens:
                chunks.append(". ".join(chunk) + ".")
                chunk = []
                tokens_so_far = 0

            if token > max_tokens:
                continue

            chunk.append(sentence)
            tokens_so_far += token + 1

        return chunks

    shortened = []

    for _, row in df.iterrows():
        if row['transcript'] is None:
            continue

        if row['n_tokens'] > config.max_tokens:
            row_chunks = split_into_many(row['transcript'], max_tokens=config.max_tokens)
            for chunk in row_chunks:
                columns_to_append = {col: row[col] for col in df_columns if col in row}
                columns_to_append['transcript'] = chunk
                shortened.append(columns_to_append)
        else:
            shortened.append({col: row[col] for col in df_columns})

    df_shortened = pd.DataFrame(shortened)
    df_shortened['n_tokens'] = df_shortened['transcript'].apply(lambda x: len(tokenizer.encode(x)))
    df_shortened['n_tokens'].hist()
    plt.show()

    logger.debug(df_shortened)
    return df_shortened


def create_embeddings(df):
    df['embedding'] = df['transcript'].apply(
        lambda x: openai.Embedding.create(input=x, engine=config.embedding_engine)['data'][0]['embedding'])
    df = df.drop('transcript', axis=1)
    return df


def embeddings_exists():
    if config.train_embeddings_path.is_file() and config.test_embeddings_path.is_file():
        return True
    return False


In [None]:
import pickle

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.dummy import DummyClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    accuracy_score, make_scorer, recall_score, precision_score, f1_score
)
from sklearn.model_selection import (
    KFold, train_test_split, GridSearchCV, cross_validate
)
from sklearn.svm import SVC

import config
from config import logger

def embeddings_to_array(embeddings_file):
    df = pd.read_csv(embeddings_file)
    df['embedding'] = df['embedding'].apply(eval).apply(np.array)
    logger.debug(df.head())
    return df


def cross_validation(model, _X, _y, _cv):
    _scoring = {
        'accuracy': make_scorer(accuracy_score),  
        'precision': make_scorer(precision_score, average='weighted'), 
        'recall': make_scorer(recall_score, average='weighted'),        
        'f1_score': make_scorer(f1_score, average='macro')  
    }

    scores = cross_validate(estimator=model,
                            X=_X,
                            y=_y,
                            cv=_cv,
                            scoring=_scoring,
                            return_train_score=True)

    metrics = ['accuracy', 'precision', 'recall', 'f1_score']

    result = {}

    for metric in metrics:
        train_scores = scores[f'test_{metric}']
        train_scores_mean = round(train_scores.mean(), 3)
        train_scores_std = round(train_scores.std(), 3)

        test_scores = scores[f'test_{metric}']
        test_scores_mean = round(test_scores.mean(), 3)

        result[f'train_{metric}'] = train_scores
        result[f'train_{metric}_mean'] = train_scores_mean
        result[f'train_{metric}_std'] = train_scores_std

        result[f'test_{metric}'] = test_scores
        result[f'test_{metric}_mean'] = test_scores_mean

    return result


def classify_embedding(train_data, test_data, _n_splits):
    logger.info("Initiating classification with GPT-3 text embeddings...")
    y_train = train_data['diagnosis'].values
    X_train = train_data['embedding'].to_list()
    X_test = test_data['embedding'].to_list()

    baseline_score = dummy_stratified_clf(X_train, y_train)
    logger.debug(f"Baseline performance of the dummy classifier: {baseline_score}")

    models = [SVC(), LogisticRegression(), RandomForestClassifier()]
    names = ['SVC', 'LR', 'RF']

    cv = KFold(n_splits=_n_splits, random_state=42, shuffle=True)

    results_df = pd.DataFrame(columns=['Set', 'Model', 'Accuracy', 'Precision', 'Recall', 'F1'])
    models_size_df = pd.DataFrame(columns=['Model', 'Size'])

    logger.info("Beginning to train models using GPT embeddings...")

    total_models_size = 0

    for model, name in zip(models, names):
        logger.info(f"Initiating {name}...")

        best_params = hyperparameter_optimization(X_train, y_train, cv, model, name)
        model.set_params(**best_params)
        scores = cross_validation(model, X_train, y_train, cv)
        results_df = results_to_df(name, scores, results_df)

        visualize_results(_n_splits, name, scores, (config.embedding_results_dir / "plots").resolve())

        model_size = len(pickle.dumps(model, -1))
        logger.debug(f"Model size of {name} before training: {model_size} bytes.")

        model.fit(X_train, y_train)

        model_size = len(pickle.dumps(model, -1))
        logger.debug(f"Model size of {name} after training: {model_size} bytes.")
        total_models_size += model_size

        models_size_df = pd.concat([models_size_df, pd.DataFrame([{'Model': name,
                                                                   'Size': f"{model_size} B",
                                                                   }])], ignore_index=True)

        model_test_results = pd.read_csv(config.empty_test_results_file)

        model_predictions = model.predict(X_test)

        filename_to_prediction = {}

        for filename, prediction in zip(test_data['addressfname'], model_predictions):
            filename_to_prediction[filename] = 'ProbableAD' if prediction == 1 else 'Control'
        model_test_results['Prediction'] = model_test_results['ID'].map(filename_to_prediction)
        model_test_results_csv = (config.embedding_results_dir / f'task1_{name}.csv').resolve()
        model_test_results.to_csv(model_test_results_csv, index=False)
        logger.info(f"Writing {model_test_results_csv}...")
        evaluate_similarity(name, model_test_results)

    logger.info("Training using GPT embeddings done.")

    results_df = results_df.sort_values(by='Set', ascending=False)
    results_df = results_df.reset_index(drop=True)

    results_df = pd.concat([results_df, pd.DataFrame([{'Set': 'Test',
                                                       'Model': 'Dummy',
                                                       'Accuracy': baseline_score,
                                                       }])], ignore_index=True)

    embedding_results_file = (config.embedding_results_dir / 'embedding_results.csv').resolve()
    results_df.to_csv(embedding_results_file)
    logger.info(f"Writing {embedding_results_file}...")

    logger.debug(f"Total size of all models: {total_models_size}.")
    models_size_df = pd.concat([models_size_df, pd.DataFrame([{'Model': 'Total',
                                                               'Size': f'{total_models_size} B',
                                                               }])], ignore_index=True)

    models_size_df.to_csv(config.models_size_file)
    logger.info(f"Writing {config.models_size_file}...")

    logger.info("Classification with GPT-3 text embeddings done.")


def evaluate_similarity(name, model_test_results):
    test_results_task1 = pd.read_csv(config.test_results_task1)
    real_diagnoses = test_results_task1['Dx']
    predicted_diagnoses = model_test_results['Prediction']
    matching_values = (real_diagnoses == predicted_diagnoses).sum()
    total_values = len(real_diagnoses)
    similarity_percentage = (matching_values / total_values) * 100
    logger.info(f"The similarity between the real and predicted diagnoses using model {name} "
                f"is {similarity_percentage:.2f}%.")


def hyperparameter_optimization(X_train, y_train, cv, model, name):
    lr_param_grid, rf_param_grid, svc_param_grid = param_grids()
    grid_search = None
    if name == 'SVC':
        grid_search = GridSearchCV(estimator=model, param_grid=svc_param_grid, cv=cv, n_jobs=-1, error_score=0.0)
    elif name == 'LR':
        grid_search = GridSearchCV(estimator=model, param_grid=lr_param_grid, cv=cv, n_jobs=-1, error_score=0.0)
    elif name == 'RF':
        grid_search = GridSearchCV(estimator=model, param_grid=rf_param_grid, cv=cv, n_jobs=-1, error_score=0.0)

    grid_search.fit(X_train, y_train)
    best_params = grid_search.best_params_
    return best_params


def param_grids():
    svc_param_grid = {
        'C': [0.1, 1, 10, 100],
        'gamma': [1, 0.1, 0.01, 0.001],
        'kernel': ['rbf', 'poly', 'sigmoid']
    }
    lr_param_grid = [
        {'penalty': ['l1', 'l2'],
         'C': np.logspace(-4, 4, 20),
         'solver': ['liblinear'],
         'max_iter': [100, 200, 500, 1000]},
        {'penalty': ['l2'],
         'C': np.logspace(-4, 4, 20),
         'solver': ['lbfgs'],
         'max_iter': [200, 500, 1000]},
    ]
    rf_param_grid = {
        'n_estimators': [25, 50, 100, 150],
        'max_features': ['sqrt', 'log2', None],
        'max_depth': [3, 6, 9],
        'max_leaf_nodes': [3, 6, 9],
    }
    return lr_param_grid, rf_param_grid, svc_param_grid


def visualize_results(_n_splits, name, results, save_dir):
    plot_accuracy_path = (save_dir / f'plot_accuracy_{name}.png').resolve()
    plot_precision_path = (save_dir / f'plot_precision_{name}.png').resolve()
    plot_recall_path = (save_dir / f'plot_recall_{name}.png').resolve()
    plot_f1_path = (save_dir / f'plot_f1_{name}.png').resolve()
    plot_result(name,
                "Accuracy",
                f"Accuracy scores in {_n_splits} Folds",
                results["train_accuracy"],
                results["test_accuracy"],
                plot_accuracy_path)
    plot_result(name,
                "Precision",
                f"Precision scores in {_n_splits} Folds",
                results["train_precision"],
                results["test_precision"],
                plot_precision_path)
    plot_result(name,
                "Recall",
                f"Recall scores in {_n_splits} Folds",
                results["train_recall"],
                results["test_recall"],
                plot_recall_path)
    plot_result(name,
                "F1",
                f"F1 Scores in {_n_splits} Folds",
                results["train_f1_score"],
                results["test_f1_score"],
                plot_f1_path)


def results_to_df(name, scores, results_df):
    results_df = pd.concat([results_df, pd.DataFrame([{'Set': 'Train',
                                                       'Model': name,
                                                       'Accuracy': f"{scores['train_accuracy_mean']} "
                                                                   f"({scores['train_accuracy_std']})",
                                                       'Precision': f"{scores['train_precision_mean']} "
                                                                    f"({scores['train_precision_std']})",
                                                       'Recall': f"{scores['train_recall_mean']} "
                                                                 f"({scores['train_recall_std']})",
                                                       'F1': f"{scores['train_f1_score_mean']} "
                                                             f"({scores['train_f1_score_std']})",
                                                       }])], ignore_index=True)

    results_df = pd.concat([results_df, pd.DataFrame([{'Set': 'Test',
                                                       'Model': name,
                                                       'Accuracy': scores['test_accuracy_mean'],
                                                       'Precision': scores['test_precision_mean'],
                                                       'Recall': scores['test_recall_mean'],
                                                       'F1': scores['test_f1_score_mean']
                                                       }])], ignore_index=True)
    return results_df

def plot_result(x_label, y_label, plot_title, train_data, val_data, savefig_path=None):
    fig = plt.figure(figsize=(12, 6))
    labels = ["1st Fold", "2nd Fold", "3rd Fold", "4th Fold", "5th Fold", "6th Fold", "7th Fold", "8th Fold",
              "9th Fold", "10th Fold"]
    X_axis = np.arange(len(labels))
    plt.ylim(0.40000, 1)
    plt.bar(X_axis - 0.2, train_data, 0.4, color='blue', label='Training')
    plt.bar(X_axis + 0.2, val_data, 0.4, color='red', label='Validation')
    plt.title(plot_title, fontsize=30)
    plt.xticks(X_axis, labels)
    plt.xlabel(x_label, fontsize=14)
    plt.ylabel(y_label, fontsize=14)
    plt.legend()
    plt.grid(True)
    plt.show()
    if savefig_path is not None:
        fig.savefig(savefig_path, dpi=fig.dpi)


def dummy_stratified_clf(X, y):
    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
    stratified_clf = DummyClassifier(strategy='stratified').fit(X_train, y_train)

    score = round(stratified_clf.score(X_test, y_test), 3)

    return score


In [None]:
import openai
import pandas as pd
import classification
import config
import embedding
import transcribe
from config import logger
from utils import get_user_input, df_to_csv


def main():
    openai.api_key = config.secret_key()

    tokenizer = config.set_up()

    yes_choices = ["yes", "y"]
    no_choices = ["no", "n"]

    transcription_prompt = get_user_input("Would you like to transcribe the audio files? (yes/no): ",
                                          yes_choices
                                          + no_choices)

    if transcription_prompt in yes_choices:
        logger.info("If there is already a transcription, please delete it first. "
                    "Otherwise, already transcribed files will be skipped, no matter which model was used for it.")
        whisper_model_choices = ["tiny", "base", "small", "medium", "large"]
        whisper_model_prompt = get_user_input("Which Whisper model should be used for transcription? "
                                              "(tiny/base/small/medium/large): ", whisper_model_choices)
        config.whisper_model_name = whisper_model_prompt
        transcribe.transcribe()
    else:
        logger.info("Transcription skipped.")

    classification_prompt = get_user_input("Would you like the classification to be (re-)run? (yes/no): ", yes_choices
                                           + no_choices)

    if classification_prompt in yes_choices:
        create_embeddings = False
        if embedding.embeddings_exists():
            embedding_prompt = get_user_input("There already seem to exist some embeddings. "
                                              "Would you like to create new embeddings? (yes/no): ",
                                              yes_choices + no_choices)
            if embedding_prompt in yes_choices:
                create_embeddings = True
            else:
                logger.info("Embedding skipped.")
        else:
            create_embeddings = True
            logger.info("Embeddings not found. Creating embeddings automatically...")

        if create_embeddings:
            logger.info("Initiating embedding...")
            train_df = pd.read_csv(config.train_scraped_path)
            test_df = pd.read_csv(config.test_scraped_path)

            train_tokenization = embedding.tokenization(train_df, tokenizer)
            test_tokenization = embedding.tokenization(test_df, tokenizer)

            train_embeddings = embedding.create_embeddings(train_tokenization)
            test_embeddings = embedding.create_embeddings(test_tokenization)

            df_to_csv(train_embeddings, config.train_embeddings_path)  # Specify file paths
            df_to_csv(test_embeddings, config.test_embeddings_path)

            logger.info("Embedding done.")

        train_embeddings_array = classification.embeddings_to_array(config.train_embeddings_path)
        test_embeddings_array = classification.embeddings_to_array(config.test_embeddings_path)

        classification.classify_embedding(train_embeddings_array, test_embeddings_array, config.n_splits)
    else:
        logger.info("Classification skipped.")


if __name__ == "__main__":
    main()
