In [1]:
# Загрузка датасета
def getting_dataset() -> None:
    '''The result of executing this function is a dataset downloaded into the directory "Downloads"'''

    # Импорт библиотек
    from selenium import webdriver
    from selenium.webdriver.common.by import By
    from selenium.webdriver.common.keys import Keys
    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.webdriver.chrome.options import Options
    from urllib.parse import urljoin
    import time

    # Инициализируем WebDriver:
    USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 YaBrowser/24.1.0.0 Safari/537.36'
    chrome_options = Options()
    chrome_options.add_argument(f'user-agent={USER_AGENT}')
    driver = webdriver.Chrome()

    main_url = 'https://www.kaggle.com'
    sign_in_url = 'https://www.kaggle.com/account/login'
    # Датасет находится на странице: https://www.kaggle.com/datasets/chaitanyakck/medical-text/data
    dataset_url = "https://www.kaggle.com/datasets/chaitanyakck/medical-text/data"

    try:
        # Перейдём на страницу входа в аккаунт Kaggle для авторизированного скачивания датасета
        driver.get(sign_in_url)

        # Нажмём на кнопку "Sign in with Google" для авторизации (подразумевая, что аккаунт уже зарегистрирован)
        google_sign_in_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.XPATH, '//button[contains(., "Sign in with Google")]'))
            )        
        google_sign_in_button.click()

        # Добавим ожидание с целью снижения нагрузки на сервер.
        time.sleep(3)

        try:
            # Проверим, что мы вошли в аккаунт и можем скачать датасет под своим аккаунтом.
            if driver.find_element((By.XPATH, '//h1[contains(., "Welcome")]')):

                # Теперь мы вошли в систему и можем переходить к дальнейшим действиям.
                # Откроем веб-страницу с датасетом:
                driver.get(dataset_url)

                # Найдём кнопку "Download":
                download_button = driver.find_element(By.XPATH, '//button[contains(., "file_download")]')
                # Если кнопка найдена, нажмём на неё
                if download_button:
                    download_button.click()
                # В противном случае воспользуемся альтернативным способом получения датасета - 
                # непосредственным ереходом по ссылке загрузки датасета
                else:
                    # найдём элемент, в котором содержится относительная ссылка
                    href_element = driver.find_element(By.XPATH, '//div[@class="sc-emfenM sc-fnpAPw cvuSKw gzjyQr"]/a')
                    # извлечём относительную ссылку
                    rel_link = href_element.get_attribute('href')
                    # составим абсолютный путь на скачивание архива
                    ds_download_link = urljoin(main_url, rel_link)
                    # перейдём по прямой ссылке загрузки
                    driver.get(ds_download_link)


                # Можно использовать аналогичный код:
                # В этом варианте кода заменён time.sleep() на явные ожидания WebDriverWait(), которые, возможно, являются более надежными.
                
                # try:
                #     download_button = WebDriverWait(driver, 10).until(
                #         EC.element_to_be_clickable((By.XPATH, '//button[contains(., "file_download")]'))
                #     )
                #     download_button.click()
                # except Exception as e:
                #     href_element = WebDriverWait(driver, 10).until(
                #         EC.presence_of_element_located((By.XPATH, '//div[@class="sc-emfenM sc-fnpAPw cvuSKw gzjyQr"]/a'))
                #     )
                #     rel_link = href_element.get_attribute('href')
                #     ds_download_link = urljoin(main_url, rel_link)
                #     driver.get(ds_download_link)

                # WebDriverWait(driver, 10).until(
                #     EC.presence_of_element_located((By.CLASS_NAME, "download-modal"))
                # )

                # Также, можно добавить контекстный менеджер with для инициализации и автоматического закрытия WebDriver после завершения работы функции.

        except Exception as e:
            print(f'Произошла ошибка в процессе поиска элемента на странице - {e}')
        # Подождём, пока загрузится файл:
        # Используем ожидание появления элемента с определенным классом, указывающим на загрузку
        WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.CLASS_NAME, "download-modal"))
        )
    except Exception as E:
        print(f'Произошла ошибка в процессе авторизации - {E}')
    # Закроем браузер в любом случае
    finally:
        driver.quit()


# if __name__ == "__main__":
#     getting_dataset()       

In [12]:
# Разархивирование загруженного архива в рабочую директорию
def unzip_and_replace_dataset(zip_path: str ="C:\\Users\\Allen\\Downloads\\archive.zip", 
                              extract_to: str = "D:\\GeekBrains\\Data_engineer_Diploma_project") -> None:
    '''The function unzips the downloaded archive into the working directory'''

    # Импорт библиотек
    import zipfile
    import os

    # Проверка существования файла
    if not os.path.exists(zip_path):
        print(f"The file {zip_path} does not exist.")
        return

    # Разархивирование архива
    try:
        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            zip_ref.extractall(extract_to)
        print(f"Archive extracted to {extract_to}")
    except zipfile.BadZipFile:
        print("The file is a bad zip file and cannot be extracted.")
    except Exception as e:
        print(f"An error occurred: {e}")


# if __name__ == "__main__":
#     zip_file_path = "C:\\Users\\Allen\\Downloads\\archive.zip"
#     destination_directory = "D:\\GeekBrains\\Data_engineer_Diploma_project"
#     unzip_and_replace_dataset(zip_file_path, destination_directory)  

In [13]:
# Преобразование датасетов 
def transforming_datasets(test_path: str = "D:\\GeekBrains\\Data_engineer_Diploma_project\\test.dat", 
                          train_path: str = "D:\\GeekBrains\\Data_engineer_Diploma_project\\train.dat", 
                          test_csv_path: str = "D:\\GeekBrains\\Data_engineer_Diploma_project\\ma_test.csv", 
                          train_csv_path: str = "D:\\GeekBrains\\Data_engineer_Diploma_project\\ma_train.csv") -> None:
    '''The function opens downloaded files, generates datasets adapted to processing based on them, and saves new datasets in .csv format'''

    # Импорт библиотек
    import pandas as pd

    # В архиве датасеты (тренировочный и тестовый) содержатся в формате .dat
    # поэтому, нам нужно их переформатировать в датасеты, пригодные и удобные для дальнейшего использования
    # Чтение файла .dat

    df_test = pd.read_fwf(test_path, sep='\t', header=None)
    df_train = pd.read_fwf(train_path, sep='\t', header=None)

    # Датафрейм df_test имеет атипичную ненормализованную структуру - всего 101 столбец, все аннотации содержатся в первом столбце, 
    # остальные колонки пустые, поэтому нам нужно создать датафрейм только из первой колонки.
    # Датафрейм df_train имеет схожую структуру - 101 столбец, первая колонка - классы заболеваний,
    # все аннотации содержатся во втором столбце, остальные колонки пустые, 
    # поэтому нам нужно создать датафрейм из первых двух столбцов.

    # Трансформируем df_test в датасет формата .csv:
    # Выбор только первого столбца
    first = df_test.iloc[:, 0]

    # Запись данных первого столбца в файл .csv с заголовком
    first.to_csv(test_csv_path, index=False, header=['abstracts'])

    # Теперь преобразуем df_train:
    # Выбор первых двух сколонок
    first_two = df_train.iloc[:, :2]

    # Запись данных столбцов в файл .csv с заголовками
    first_two.to_csv(train_csv_path, index=False, header=['labels', 'abstracts'])


# if __name__ == "__main__":
#     test_dat_path = "D:\\GeekBrains\\Data_engineer_Diploma_project\\test.dat"
#     train_dat_path = "D:\\GeekBrains\\Data_engineer_Diploma_project\\train.dat"
#     test_csv_path = "D:\\GeekBrains\\Data_engineer_Diploma_project\\ma_test.csv"
#     train_csv_path = "D:\\GeekBrains\\Data_engineer_Diploma_project\\ma_train.csv"
#     transforming_datasets(test_dat_path, train_dat_path, test_csv_path, train_csv_path)

In [None]:
def prepare_to_labeling(input_csv: str = 'ma_train.csv', manual_label_csv: str = 'manual_label_sample.csv', 
                        rule_based_csv: str = 'rule_based_sample.csv', train_size: float = 0.01):
    """
    Divides the dataframe into two parts for manual markup and for automatic rule-based markup.
    
    Parameters:
    input_csv (str): Путь к исходному файлу CSV.
    manual_label_csv (str): Путь к файлу CSV для ручной разметки.
    rule_based_csv (str): Путь к файлу CSV для разметки на основе правил.
    train_size (float): Доля датафрейма для ручной разметки.
    """
    
    # Импорт библиотек
    import pandas as pd
    from sklearn.model_selection import train_test_split  

    # Создание датафрейма Pandas из файла .csv
    df_train = pd.read_csv(input_csv, engine='python', encoding='utf-8', on_bad_lines='skip', encoding_errors='ignore')
    # Разделение датафрейма на две части - для ручной разметки и для разметки на основе правил
    manual_label_sample, rule_based_sample = train_test_split(df_train, train_size=train_size, random_state=42)

    # Сохранение датафреймов в файлы .csv для дальнейшей обработки
    manual_label_sample.to_csv(manual_label_csv, index=False)
    rule_based_sample.to_csv(rule_based_csv, index=False)

In [None]:

import numpy as np
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split                # разделение данных на обучающую и тестовую части
from sklearn.feature_extraction.text import TfidfVectorizer         # преобразование текста в вектор
from sklearn.linear_model import LogisticRegression                 # использование модели логистической регрессии
from sklearn.metrics import accuracy_score, classification_report   # оценка производительности модели
from sklearn.pipeline import Pipeline                               # конвеер обработки данных
from sklearn.metrics import f1_score
from sklearn.utils import shuffle

In [None]:
# Проводим разметку на основе правил.
def rule_for_labeling(text: str) -> str:
    '''The function defines a rule for assigning a label to the text and performs markup'''
    
    # Определяем списки с ключевыми значениями по каждой из четырех категорий 
    neoplasms_list = [
        'neoplas', 'tumor', 'cancer', 'lymphom', 'blastoma', 'malign', 'benign', 'melanom', 'leukemi', 'metasta', 'carcinom', 'sarcoma', 'glioma',
        'adenoma', 'chemotherapy', 'radiotherapy', 'oncology', 'carcinogenesis', 'mutagen', 'angiogenesis', 'radiation', 'immunotherapy', 'biopsy',
        'brachytherapy', 'metastasis', 'prognosis', 'biological therapy', 'carcinoma', 'myeloma', 'genomics', 'immunology', 'cell stress',
        'oncogene', 'tumorigenesis', 'cytology', 'histology', 'oncologist', 'neoplasm', 'oncogenic', 'tumor suppressor genes', 'malignancy',
        'cancerous', 'non-cancerous', 'solid tumor', 'liquid tumor', 'tumor marker', 'oncogenesis', 'tumor microenvironment', 'carcinogenesis', 
        'adenocarcinoma', 'squamous cell carcinoma'
    ]

    digestive_list = [
        'digestive', 'esophag', 'stomach', 'gastr', 'liver', 'cirrhosis', 'hepati', 'pancrea', 'intestin', 'sigmo', 'recto', 'rectu', 'cholecyst', 
        'gallbladder', 'portal pressure', 'portal hypertension', 'appendic', 'ulcer', 'bowel', 'dyspepsia', 'colitis', 'enteritis', 'gastroenteritis', 
        'endoscopy', 'colonoscopy', 'peptic', 'gastrointestinal', 'abdominal', 'dysphagia', 'diverticulitis', 'irritable bowel syndrome', 
        'inflammatory bowel disease', 'gastroesophageal reflux', 'celiac disease', 'crohn\'s disease', 'ulcerative colitis',
        'gastroscopy', 'biliary', 'esophageal', 'gastritis', 'hepatic', 'lactose intolerance', 'gastroenterologist', 'digestion', 'absorption', 
        'malabsorption', 'intestinal flora', 'microbiota', 'probiotics', 'prebiotics', 'dietary fiber', 'nutrition'
    ]

    neuro_list = [
        'neuro', 'nerv', 'reflex', 'brain', 'cerebr', 'white matter', 'subcort', 'plegi', 'intrathec', 'medulla', 'mening', 'epilepsy', 
        'multiple sclerosis', 'parkinson\'s disease', 'alzheimer\'s disease', 'seizure', 'paresthesia', 'dementia', 'encephalopathy', 
        'neuropathy', 'neurodegeneration', 'stroke', 'cerebral', 'spinal cord', 'neurotransmitter', 'synapse', 'neuralgia', 'neurology', 
        'neurosurgery', 'neurooncology', 'neurovascular', 'autonomic nervous system', 'central nervous system', 'peripheral nervous system', 
        'brain injury', 'concussion', 'traumatic brain injury', 'spinal injury', 'neurological disorder', 'neurodevelopmental disorders',
        'neurodegenerative disorders', 'neuroinflammation', 'neuroimaging', 'neuroscience', 'neurophysiology', 'neurotransmission', 
        'neuroplasticity', 'neurogenesis', 'neuroendocrinology', 'neuropsychology', 'neurotoxicity', 'neuromodulation', 'neuroprotection', 
        'neuropathology'
    ]

    cardio_list = [
        'cardi', 'heart', 'vascul', 'embolism', 'stroke', 'reperfus', 'thromboly', 'ischemi', 'hypercholesterolemia', 'hyperten', 'blood pressure', 
        'valv', 'ventric', 'aneurysm', 'coronar', 'arter', 'aort', 'electrocardiogra', 'arrhythm', 'clot', 'mitral', 'endocard', 'hypertension', 
        'myocardial', 'infarction', 'cardiover', 'fibrillat', 'bypass', 'pericarditis', 'cardiomyopathy', 'hypotension', 'angiography', 'stenting', 
        'cardiac catheterization', 'vascular', 'echocardiogram', 'cardiogenic', 'angioplasty', 'cardiac arrest', 'heart failure', 
        'cardiac rehabilitation', 'electrophysiology', 'heart valve disease', 'cardiopulmonary', 'cardiothoracic surgery', 'vascular surgery', 
        'cardiovascular disease', 'cardiovascular health', 'cardiovascular risk', 'cardiovascular system', 'cardioprotection', 'cardiovascular imaging', 
        'cardiovascular physiology', 'cardiovascular pharmacology', 'cardiovascular intervention', 'cardiovascular diagnostics', 'cardiovascular genetics'
    ]

    # Приведем текст аннотаций к нижнему регистру
    row = text.lower()
    
    # В используемом датасете используется следующая маркировка:
    # neoplasms = 1
    # digestive system diseases = 2
    # nervous system diseases = 3
    # cardiovascular diseases = 4
    # general pathological conditions = 5

    # Создаём словарь в котором ключи - категории заболеваний, а значения - количество ключевых значений в тексте по каждой категории
    res_dict = {
        '1': 0,
        '2': 0,
        '3': 0,
        '4': 0
    }
    # Рассчитываем количество ключевых значений в тексте и заполняем словарь
    for p in neoplasms_list:
        res_dict['1'] += row.count(p)
    for d in digestive_list:
        res_dict['2'] += row.count(d)
    for n in neuro_list:
        res_dict['3'] += row.count(n)
    for c in cardio_list:
        res_dict['4'] += row.count(c)
    
    # Рассчитываем наиболее часто встречаемую категорию в тексте и её отношение ко всем выявленным значения по всем категориям.
    # Для отнесения текста к определенной категории его доля должна превышать условно взятое значение - 0,3.
    # Если не превышает, то текст будет отнесён к категории 'general pathological conditions' и ему будет присвоена метка - 5
    most_frequent = max(res_dict.values())
    divisor = sum(res_dict.values())
    if divisor > 0 and (most_frequent / divisor) > 0.3:
        for key, value in res_dict.items(): 
            if value == most_frequent:
                return int(key)
    else:
        return int(5)

In [None]:
def rule_based_labeling() -> pd.DataFrame:

    # Импорт библиотеки
    import pandas as pd
    from text_clas_pkg import rule_for_labeling

    # Создание датафрейма Pandas
    df_rule = pd.read_csv('rule_based_sample.csv')

    # Разметка датафрейма - добавляем в датафрейм колонку в которой 
    # будут метки на онове определенного нами правила
    df_rule['labeled condition name'] = df_rule['medical_abstract'].apply(rule_for_labeling)

    return df_rule

In [None]:
# Ручную разметку выборки выдержек из медицинских статей в размере 144 шт (0,01 от всего датасета) я провел с использованием Label Studio. 
# Результат разметки сохранен в текущую директорию с именем ls_manual_labeled.csv.

# Далее необходимо объединить два размеченных датасета. Для этого загрузим размеченный вручную датасет и посмотрим его структуру.

In [None]:

def merging_labeled_dfs(df_rule: pd.DataFrame) -> pd.DataFrame:
    '''The function combines the date frames obtained as a result of automatic 
    rule-based markup and manual markup and brings the combined dataframe to the 
    form in which it will be used to train the model'''
    
    # Импорт библиотек
    import pandas as pd
    import os

    # Проверка наличия датасета размеченного вручную в текущей директории
    # Поскольку процесс будет выполняться автоматизированно, этап ручной разметки может быть исключен из процесса,
    # либо выполняться не при каждом запуске процесса
    dset_name = 'ls_manual_labeled.csv'  
    dset_exists = os.path.exists(dset_name)

    if dset_exists:
        df_manual = pd.read_csv('ls_manual_labeled.csv')
        # Для объединения датасетов приведем датасет созданный Label Studio к соответствующему виду:
        df_manual.drop(['annotation_id', 'annotator', 'created_at', 'id', 'lead_time', 'updated_at'], axis=1, inplace=True)
        df_manual.rename(columns={'sentiment': 'labeled condition name'}, inplace=True)
        
    # Теперь объединим датасеты:
    df_merged = pd.concat([df_rule, df_manual])

In [None]:
# Для удобства дальнейшей работы удалим колонку condition_label, а так же добавим числовые значения, соответствующие размеченным нами данным.

df_merged.drop(['condition_label'], axis=1, inplace=True)
label_map = {"neoplasms": 1, "digestive system diseases": 2, "nervous system diseases": 3, "cardiovascular diseases": 4, "general pathological conditions": 5}
df_merged['new condition label'] = df_common['labeled condition name'].map(label_map)

In [None]:
# Переходим к обучению модели.
# Для начала, ещё раз перемешаем датасет.

df_common = shuffle(df_common)

In [None]:
# Разделим датасет на 2 части.

labeled, unlabeled = train_test_split(df_common, test_size=0.2, random_state=42)

In [None]:
# Создадим функцию для обучения модели

def train_model(labeled):
    vectorizer = TfidfVectorizer()
    x = vectorizer.fit_transform(labeled['medical_abstract'])
    y = labeled['new condition label']

    model = LogisticRegression(max_iter=12000)
    model.fit(x, y)

    return model, vectorizer

In [None]:
# Обучение модели на выборке из размеченного датасета

model, vectorizer = train_model(labeled=labeled)

In [None]:
# Использование модели на "неразмеченной" выборке


x_unlabeled = vectorizer.transform(unlabeled['medical_abstract'])
y_unlabeled_predicted = model.predict(x_unlabeled)

In [None]:
# Расчёт энтропии предсказаний


y_unlabeled_probe = model.predict_proba(x_unlabeled)
uncertainty = -(y_unlabeled_probe * np.log2(y_unlabeled_probe)).sum(axis=1)

In [None]:
# Выбор 100 наиболее неопределенных точек (для маркировки человеком)


labeled_new = unlabeled.iloc[uncertainty.argsort()[:100]]
unlabeled_new = unlabeled.iloc[uncertainty.argsort()[100:]]

In [None]:
# Разметка новых точек и добавление их к размеченному датасету


labeled = pd.concat([labeled, labeled_new])

In [None]:
# Переобучение модели на расширенном размеченном множестве

model, vectorizer = train_model(labeled)

In [None]:
# Для оценки эффективности загрузим тестовый датасет

test_df = pd.read_csv('medical_tc_test.csv', engine='python', on_bad_lines='skip')

In [None]:
# Запустим модель

x_test = vectorizer.transform(test_df['medical_abstract'])
y_test_predicted = model.predict(x_test)

In [None]:
# Оценим эффективность

f1 = f1_score(test_df['condition_label'], y_test_predicted, average='micro')
print(f1)

In [None]:
# Вариант 2

# На объединенном размеченном датасете:
X = df_common['medical_abstract']
Y = df_common['new condition label']

vectorizer2 = TfidfVectorizer()
X_vectorized = vectorizer2.fit_transform(X)

X_train, X_test, Y_train, Y_test = train_test_split(X_vectorized, Y, test_size=0.3, random_state=42)

model2 = LogisticRegression(max_iter=15000)
model2.fit(X_train, Y_train)


Y_test_predicted = model2.predict(X_test)

accuracy = accuracy_score(Y_test, Y_test_predicted)
print(accuracy)


# И ещё раз на тестовом датасете
X2 = test_df['medical_abstract']
Y2 = test_df['condition_label']

vectorizer3 = TfidfVectorizer()
X2_vectorized = vectorizer3.fit_transform(X2)

X2_train, X2_test, Y2_train, Y2_test = train_test_split(X2_vectorized, Y2, test_size=0.3, random_state=42)

model3 = LogisticRegression(max_iter=15000)
model3.fit(X2_train, Y2_train)


Y2_test_predicted = model3.predict(X2_test)

accuracy = accuracy_score(Y_test, Y_test_predicted)
print(accuracy)