In [343]:
import torch

In [260]:
from sentence_transformers import SentenceTransformer, util
from transformers import AutoTokenizer, AutoModel


query = "что должны обеспечивать работники железнодорожного транспорта?"

docs = [
    "Работники железнодорожного транспорта в соответствии со своими должностными обязанностями должны обеспечивать \
    выполнение Правил и приложений к ним, безопасность движения и эксплуатации железнодорожного транспорта.", 
    "Ответственными за содержание и (или) исправное техническое состояние железнодорожных путей, сооружений и устройств железнодорожного транспорта \
    с обеспечением периодичности выполнения ремонтов, установленных нормативной технической документацией, являются работники железнодорожного транспорта, \
    непосредственно их обслуживающие.",
    "Работники железнодорожного транспорта обязаны соблюдать правила и нормы по охране труда, промышленной, экологической, \
    пожарной безопасности, санитарно-эпидемиологические правила и нормативы в соответствии со своими должностными обязанностями и должностными инструкциями.",
    "Доступ на локомотивы, в кабины управления мотор-вагонного подвижного состава <18>, к специальному самоходному подвижному составу <19>, \
    к сигналам, железнодорожным стрелкам <20> (далее - стрелка), аппаратам, механизмам и другим устройствам, связанным с обеспечением безопасности движения \
    и эксплуатации железнодорожного транспорта, в помещения, из которых производится управление сигналами и указанными устройствами, имеют работники железнодорожного \
    транспорта, в случае, если нахождение работников железнодорожного транспорта на указанных объектах предусмотрено их должностными обязанностями. Запрещается доступ \
    посторонних лиц на указанные в настоящем пункте объекты.",
    "В соответствии с пунктом 3 статьи 25 Федерального закона 'О железнодорожном транспорте в Российской Федерации' <25> лица, принимаемые на работу, непосредственно \
    связанную с движением поездов и маневровой работой, и работники, выполняющие такую работу и (или) подвергающиеся воздействию вредных и опасных производственных факторов, \
    проходят за счет средств работодателей обязательные предварительные (при поступлении на работу) и периодические (в течение трудовой деятельности) медицинские осмотры, \
    включающие в себя химико-токсикологические исследования наличия в организме человека наркотических средств, психотропных веществ и их метаболитов в соответствии с Порядком \
    проведения обязательных предварительных (при поступлении на работу) и периодических (в течение трудовой деятельности) медицинских осмотров на железнодорожном транспорте, \
    утвержденным приказом Министерства транспорта Российской Федерации от 19 октября 2020 г. N 428 <26>.",
    "Работники железнодорожного транспорта, производственная деятельность которых связана с движением поездов \
    и маневровой работой на железнодорожных путях общего пользования, должны проходить аттестацию, предусматривающую проверку \
    знаний Правил, инструкций по организации движения поездов и маневровой работы, по сигнализации на железнодорожном транспорте, \
    и иных нормативных правовых актов федерального органа исполнительной власти в области железнодорожного транспорта <29>",
    ]

docs = [doc.lower() for doc in docs]

def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output[0] #First element of model_output contains all token embeddings
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
    sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
    return sum_embeddings / sum_mask

tokenizer = AutoTokenizer.from_pretrained("ai-forever/sbert_large_nlu_ru")
model = AutoModel.from_pretrained("ai-forever/sbert_large_nlu_ru")

#Tokenize sentences
query_tok = tokenizer(query, padding=True, truncation=True, max_length=24, return_tensors='pt')
doc_tok = tokenizer(docs, padding=True, truncation=True, max_length=24, return_tensors='pt')

with torch.no_grad():
    query_emb = model(**query_tok)
    doc_emb = model(**doc_tok)

query_emb = mean_pooling(query_emb, query_tok['attention_mask'])
doc_emb = mean_pooling(doc_emb, doc_tok['attention_mask'])

# --------------------------------------------------------------------

#Compute dot score between query and all document embeddings
scores = util.cos_sim(query_emb, doc_emb)[0]

#Combine docs & scores
doc_score_pairs = list(zip(docs, scores))

#Sort by decreasing score
doc_score_pairs = sorted(doc_score_pairs, key=lambda x: x[1], reverse=True)

#Output passages & scores
for doc, score in doc_score_pairs:
    print(score.item(), doc)

In [346]:
tokenizer.save_pretrained('tokenizer')

('tokenizer\\tokenizer_config.json',
 'tokenizer\\special_tokens_map.json',
 'tokenizer\\vocab.txt',
 'tokenizer\\added_tokens.json',
 'tokenizer\\tokenizer.json')

In [347]:
model.save_pretrained('model')

In [337]:
def cos_similarity(first_embeddings: torch.Tensor, second_embeddings: torch.Tensor) -> torch.Tensor:
    return util.cos_sim(first_embeddings, second_embeddings)

In [336]:
def mean_pooling(model_output: torch.Tensor, attention_mask: torch.Tensor) -> torch.Tensor:
    token_embeddings = model_output[0] #First element of model_output contains all token embeddings
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
    sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
    return sum_embeddings / sum_mask

In [333]:
from typing import List

def get_embedding(text: str | List[str], tokenizer, model) -> torch.Tensor:
    encoded_input = tokenizer(text, padding=True, truncation=True, max_length=16, return_tensors='pt')

    with torch.no_grad():
        model_output = model(**encoded_input)

    return mean_pooling(model_output, encoded_input['attention_mask'])

In [334]:
def lower_case(text: str) -> str:
    return text.lower()

In [335]:
from typing import Dict


def find_sentences(query: str, id2text: Dict[int, str], id2vec: torch.Tensor, tokenizer, model) -> str:
    query_emb = get_embedding(lower_case(query), tokenizer, model)
    return id2text[torch.argmax(cos_similarity(query_emb, id2vec)).item()]

## Чтение доков

In [97]:
from os import listdir

my_path = 'dataset'

path_tree = {
    my_path : dict()
}


for file_name in listdir(my_path):
    if '.txt' in file_name:
        path_tree[my_path][file_name] = 0
    else:
        path_tree[my_path][file_name] = dict()
        for second_file_name in listdir(f'{my_path}/{file_name}'):
            path_tree[my_path][file_name][second_file_name] = 0



In [98]:
path_tree

{'dataset': {'ИНСТРУКЦИЯ ПО ОРГАНИЗАЦИИ ДВИЖЕНИЯ ПОЕЗДОВ И МАНЕВРОВОЙ РАБОТЫ НА ЖЕЛЕЗНОДОРОЖНОМ ТРАНСПОРТЕ РОССИЙСКОЙ ФЕДЕРАЦИИ': {'Общие требования к организации движения поездов на железнодорожном транспорте.txt': 0},
  'ИНСТРУКЦИЯ ПО СИГНАЛИЗАЦИИ НА ЖЕЛЕЗНОДОРОЖНОМ ТРАНСПОРТЕ РОССИЙСКОЙ ФЕДЕРАЦИИ': {'Звуковые сигналы на железнодорожном транспорте.txt': 0,
   'Общие положения.txt': 0,
   'Правила применения семафоров.txt': 0,
   'Ручные сигналы на железнодорожном транспорте.txt': 0,
   'Светофоры на железнодорожном транспорте.txt': 0,
   'Сигналы на железнодорожном транспорте.txt': 0,
   'Сигналы ограждения на железнодорожном транспорте.txt': 0,
   'Сигналы тревоги и специальные указатели.txt': 0,
   'Сигналы, применяемые для обозначения поездов, локомотивов и другого железнодорожного подвижного состава.txt': 0,
   'Сигналы, применяемые при маневренной работе.txt': 0,
   'Сигнальные указатели и знаки на железнодорожном транспорте.txt': 0},
  'ОСНОВНЫЕ ПОЛОЖЕНИЯ О ПОРЯДКЕ ДВИЖЕНИЯ ДРЕ

In [137]:
import re

def parser(path: str) -> List[str]:
    with open(path, 'r', encoding = 'cp1251') as file:
        s = file.read()
        s = re.sub(" <\d+>", "", s)
        s = re.sub("<\d+> ", "", s)
        s = re.split("\n\d+\. ", s)
        s = [re.sub("\n", " ", line) for line in s]
        s = [re.sub("  ", "\n", line) for line in s]
        s[0] = s[0][3:]
        return s

## Создание текстового корпуса

In [138]:
text_corpus = []

for file_name, value in path_tree[my_path].items():
    next_path = f'{my_path}/{file_name}'
    if value == 0:
        text_corpus.extend(parser(next_path))
    else:
        for second_file_name in listdir(next_path):
            text_corpus.extend(parser(f'{next_path}/{second_file_name}'))


In [348]:
from joblib import dump


dump(text_corpus, 'text_corpus.pkl')

['text_corpus.pkl']

## Сделать эмбеддинги

In [338]:
id2text = {id: text for id, text in enumerate(text_corpus)}
id2vec = torch.vstack([get_embedding(lower_case(id2text[id]), tokenizer, model) for id in id2text.keys()])

In [349]:
dump(id2text, 'id2text.pkl')

['id2text.pkl']

In [350]:
dump(id2text, 'id2vec.pkl')

['id2vec.pkl']

In [339]:
id2vec.size()

torch.Size([917, 1024])

## Тестирование

In [340]:
id = 52
id2text[id]

'На отдельных линиях (участках) общего пользования и железнодорожных путях необщего пользования движение поездов допускается:\nпо приказам диспетчера поездного, передаваемым машинисту ведущего локомотива по устройствам технологической железнодорожной электросвязи;\nпосредством одного жезла;\nпосредством одного локомотива.\nПри совпадении границы двух железнодорожных станций, а также на железнодорожных путях необщего пользования, не имеющих раздельных пунктов, допускается применять маневровый порядок движения.\nПеречень участков и железнодорожных станций с указанием порядка организации движения в случаях, перечисленных в настоящем пункте, устанавливается локальным нормативным актом владельца инфраструктуры (владельца железнодорожных путей необщего пользования). '

In [342]:
query = 'Что можно делать на линиях общего пользования?'
find_sentences(query, id2text, id2vec, tokenizer, model)

'Передвигать отдельные вагоны вручную на главных, приемоотправочных и сортировочных железнодорожных путях железнодорожной станции не допускается.\nПередвигать отдельные вагоны вручную допускается на прочих железнодорожных путях при отсутствии уклона, под руководством ответственного лица, выделенного владельцем инфраструктуры (владельцем железнодорожных путей необщего пользования), и в количестве не более одного груженого или двух порожних вагонов.\nПри передвижениях вагонов вручную не допускается:\n1) передвигать их со скоростью более 3 км/ч (вагоны должны быть сцеплены);\n2) перемещать их за предельный столбик в направлении главных и приемоотправочных железнодорожных путей;\n3) начинать передвижение, не имея тормозных башмаков;\n4) подкладывать для торможения под колеса предметы, не предусмотренные в техническо-распорядительном акте, а в случае его отсутствия на железнодорожных путях необщего пользования - в локальном нормативном акте владельца железнодорожных путей необщего пользован